[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239694008 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on some total ordering of * the expressions specified in `ordering`. When data is partitioned in this manner the following * two conditions are guaranteed to hold: - * - All row where the expressions in `ordering` evaluate to the same values will be in the same --- End diff -- nit: "row" -> "rows", "where... `ordering`" -> "whose `ordering` expressions" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239693849 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -243,10 +248,19 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * Represents a partitioning where rows are split across partitions based on some total ordering of * the expressions specified in `ordering`. When data is partitioned in this manner the following --- End diff -- nit: add "," after "this manner". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239689874 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- Yes, I understand that partitioning has nothing to do with intra-partition ordering at all. And it was wrong to include intra-partition ordering as part of the distribution properties. But I was thinking mentioning ordering as a side note would probably help ppl understand better how some operators work. Or maybe here's not the best place to put it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23249: [SPARK-26297][SQL] improve the doc of Distributio...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/23249#discussion_r239540987 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - *partitioned across physical machines in a cluster. Knowing this property allows some - *operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - *about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ --- End diff -- Do we also need to mention that there's another related but orthogonal physical property, i.e., the intra-partition ordering and maybe list an example here how operators take advantage of these two physical properties together? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23036: [SPARK-26065][SQL] Change query hint from a `LogicalPlan...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/23036 @gatorsmile @cloud-fan @rxin @juliuszsompolski --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23036: [SPARK-26065][SQL] Change query hint from a `Logi...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/23036 [SPARK-26065][SQL] Change query hint from a `LogicalPlan` to a field ## What changes were proposed in this pull request? The existing query hint implementation relies on a logical plan node `ResolvedHint` to store query hints in logical plans, and on `Statistics` in physical plans. Since `ResolvedHint` is not really a logical operator and can break the pattern matching for existing and future optimization rules, it is a issue to the Optimizer as the old `AnalysisBarrier` was to the Analyzer. Given the fact that all our query hints are either 1) a join hint, i.e., broadcast hint; or 2) a re-partition hint, which is indeed an operator, we only need to add a hint field on the Join plan and that will be a good enough solution for the current hint usage. This PR is to let `Join` node have a hint for its left sub-tree and another hint for its right sub-tree and each hint is a merged result of all the effective hints specified in the corresponding sub-tree. The "effectiveness" of a hint, i.e., whether that hint should be propagated to the `Join` node, is currently consistent with the hint propagation rules originally implemented in the `Statistics` approach. Note that the `ResolvedHint` node still has to live through the analysis stage because of the `Dataset` interface, but it will be got rid of and moved to the `Join` node in the "pre-optimization" stage. ## How was this patch tested? Added a `JoinHintSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark query-hint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23036 commit fce106da44310eea8ede8a74634abca002015d4f Author: maryannxue Date: 2018-11-14T17:02:17Z [SPARK-26065][SQL] Change query hint from a LogicalPlan to a field commit 785a4235a5026a777819faea06066dbe041bf819 Author: maryannxue Date: 2018-11-14T18:55:02Z [SPARK-26065][SQL] Change query hint from a LogicalPlan to a field --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule ch...
Github user maryannxue closed the pull request at: https://github.com/apache/spark/pull/22060 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22060 Thank you for reminding me, @HyukjinKwon! And thanks to @mgaido91's contribution, this has been fixed already. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22778: [SPARK-25784][SQL] Infer filters from constraints...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22778#discussion_r229356678 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -171,10 +171,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // "Extract PythonUDF From JoinCondition". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ -Batch("RewriteSubquery", Once, +Batch("Rewrite Subquery", Once, --- End diff -- @gatorsmile I think @dilipbiswal's suggestion is the right way to go. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21156 The idea is good. Is it possible to make it an optimization rule? Another suggestion is we need more test cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21156 Sorry for the delay. Iâll take another look today. On Mon, Oct 22, 2018 at 7:50 AM UCB AMPLab wrote: > Can one of the admins verify this patch? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21156#issuecomment-431825650>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AD-ogFX6Og9FX-cSEsJcyEvyrmIzrISgks5unb8TgaJpZM4TjmFn> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226527439 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -932,6 +935,23 @@ trait ScalaReflection { tpe.dealias.erasure.typeSymbol.asClass.fullName } + /** + * Returns the nullability of the input parameter types of the scala function object. + * + * Note that this only works with Scala 2.11, and the information returned may be inaccurate if + * used with a different Scala version. --- End diff -- The argument here is it's not necessarily wrong if using scala 2.12. if all inputs are of boxed types, then it can still be good. I think it's just enough to say "we don't support it. switch to the new interface otherwise we can't guarantee correctness." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226521257 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -932,6 +935,23 @@ trait ScalaReflection { tpe.dealias.erasure.typeSymbol.asClass.fullName } + /** + * Returns the nullability of the input parameter types of the scala function object. + * + * Note that this only works with Scala 2.11, and the information returned may be inaccurate if + * used with a different Scala version. --- End diff -- I think it simply returns if going through the below code path. I should probably make the java doc clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226384713 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +42,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +inputsNullSafe: Seq[Boolean], inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true, -nullableTypes: Seq[Boolean] = Nil) +udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( function: AnyRef, dataType: DataType, children: Seq[Expression], + inputsNullSafe: Seq[Boolean], --- End diff -- We'll do the Scala 2.11 approach for such places where nullableTypes info is unavailable so to at least keep legacy usage of ScalaUDF working. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226156109 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -73,19 +73,21 @@ case class UserDefinedFunction protected[sql] ( */ @scala.annotation.varargs def apply(exprs: Column*): Column = { -if (inputTypes.isDefined && nullableTypes.isDefined) { - require(inputTypes.get.length == nullableTypes.get.length) +val numOfArgs = ScalaReflection.getParameterCount(f) --- End diff -- Actually I think the "assert" should better be in `ScalaUDF`, but then it would be slightly different from the current behavior whatsoever. I'm keeping other things as they are. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226155205 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala --- @@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null))) } } + + test("SPARK-25044 Verify null input handling for primitive types - with udf()") { +val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 0)}) +val df = spark.range(0, 3).toDF("a") + .withColumn("b", udf1($"a", lit(null))) + .withColumn("c", udf1(lit(null), $"a")) + +checkAnswer( + df, + Seq( +Row(0, 1, null), +Row(1, 3, null), +Row(2, 5, null))) + } + + test("SPARK-25044 Verify null input handling for primitive types - with udf.register") { +withTable("t") { --- End diff -- I tried this first, but seems `toDF()` couldn't deduce the right type when nulls are present. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226155153 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala --- @@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null))) } } + + test("SPARK-25044 Verify null input handling for primitive types - with udf()") { +val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 0)}) +val df = spark.range(0, 3).toDF("a") + .withColumn("b", udf1($"a", lit(null))) + .withColumn("c", udf1(lit(null), $"a")) + +checkAnswer( + df, + Seq( +Row(0, 1, null), +Row(1, 3, null), +Row(2, 5, null))) + } + + test("SPARK-25044 Verify null input handling for primitive types - with udf.register") { +withTable("t") { --- End diff -- I tried this first, but seems `toDF()` couldn't deduce the right type when nulls are present. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225986215 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala --- @@ -179,7 +179,8 @@ class DynamicPartitionDataWriter( val partitionName = ScalaUDF( ExternalCatalogUtils.getPartitionPathString _, StringType, -Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId +Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))), +Seq(false, false)) --- End diff -- Don't think we need to do that for required arguments, or we do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22732 @srowen What @cloud-fan described is a change introduced in #22259. We can fix it by keeping each call to `ScalaReflection.schemaFor` in their own `Try` blocks. As to `UserDefinedFunction`, after offline discussions with @cloud-fan, we decided that there should be no occurrences of calling the constructor without setting the `nullableTypes`, so we'll just assert `nullableTypes.length == exprs.length`. I'll go ahead and fix these two items and update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225762708 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -81,11 +81,11 @@ case class UserDefinedFunction protected[sql] ( f, dataType, exprs.map(_.expr), + nullableTypes.map(_.map(!_)).getOrElse(exprs.map(_ => false)), --- End diff -- In addition to what I just pointed out, which is when we did try to get `inputSchemas` through `ScalaReflection.schemaFor` and got an exception for unrecognized types, there's another case where we could get an unspecified `nullableTypes`, and that is when `UserDefinedFunction` is instantiated calling the constructor but not the `create` method. Then I assume it's created by an earlier version, and we should use the old logic, i.e., `ScalaReflection.getParameterTypes` (https://github.com/apache/spark/pull/22259/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2L2153) to get the correct information for `nullableTypes`. Is that right, @cloud-fan @srowen ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225724505 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -81,11 +81,11 @@ case class UserDefinedFunction protected[sql] ( f, dataType, exprs.map(_.expr), + nullableTypes.map(_.map(!_)).getOrElse(exprs.map(_ => false)), --- End diff -- Looks like the only place where we'd get a not-specified `inputSchemas` is when `ScalaReflection.schemaFor` doesn't recognize a type and throws an exception (https://github.com/apache/spark/blob/1fd59c129a7aa16f9960b109128b166952992f32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L786). The caller seems to be doing a bad job by calling it this way, for example: ``` val inputSchemas = Try(ScalaReflection.schemaFor(typeTag[A1]) :: ScalaReflection.schemaFor(typeTag[A2]) :: Nil).toOption val udf = SparkUserDefinedFunction.create(f, dataType, inputSchemas) ``` It would mean if the type of only one of the parameters is unrecognizable by `ScalaReflection`, we'd end up having the entire `Seq` as `None`. I think it's fine not to check null for user-defined types that we don't know, coz they can't be primitive types anyway, but I do think we should make the type inference of each parameter independent so we do handle the nulls that need to be taken care of. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225619242 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.DataType * null. Use boxed type or [[Option]] if you wanna do the null-handling yourself. * @param dataType Return type of function. * @param children The input expressions of this UDF. + * @param handleNullForInputs Whether the inputs need null-value handling respectively. --- End diff -- Makes sense. Since I'm also using this flag to replace `KnownNotNull`, I think `inputsNullSafe` would be a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225606907 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +40,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +handleNullForInputs: Seq[Boolean], inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true, -nullableTypes: Seq[Boolean] = Nil) +udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( function: AnyRef, dataType: DataType, children: Seq[Expression], + handleNullForInputs: Seq[Boolean], --- End diff -- Just realized this is different from you original PR already, @cloud-fan did a follow-up PR adding the "nullableTypes". I'll revert the change here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225605581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -73,27 +73,27 @@ case class UserDefinedFunction protected[sql] ( */ @scala.annotation.varargs def apply(exprs: Column*): Column = { -if (inputTypes.isDefined && nullableTypes.isDefined) { - require(inputTypes.get.length == nullableTypes.get.length) +if (inputTypes.isDefined && handleNullForInputs.isDefined) { + require(inputTypes.get.length == handleNullForInputs.get.length) } Column(ScalaUDF( f, dataType, exprs.map(_.expr), + handleNullForInputs.getOrElse(exprs.map(_ => false)), --- End diff -- I actually assume the default behavior should be the other way around. If we don't know, we just do the if-else null handling and it wouldn't do us any harm correctness-wise, right? Anyway I'm not gonna change that in this PR but hope we can get an idea if and when this default behavior will happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225588931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -73,27 +73,27 @@ case class UserDefinedFunction protected[sql] ( */ @scala.annotation.varargs def apply(exprs: Column*): Column = { -if (inputTypes.isDefined && nullableTypes.isDefined) { - require(inputTypes.get.length == nullableTypes.get.length) +if (inputTypes.isDefined && handleNullForInputs.isDefined) { + require(inputTypes.get.length == handleNullForInputs.get.length) } Column(ScalaUDF( f, dataType, exprs.map(_.expr), + handleNullForInputs.getOrElse(exprs.map(_ => false)), --- End diff -- I am actually not so sure about this part, but this is just to be consistent with the behavior in your previous check-in. Can you give an example of such end-user cases where these flags are unavailable/not specified? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225587391 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.DataType * null. Use boxed type or [[Option]] if you wanna do the null-handling yourself. * @param dataType Return type of function. * @param children The input expressions of this UDF. + * @param handleNullForInputs Whether the inputs need null-value handling respectively. --- End diff -- Yes, it needs a little explanation here regarding the primitive types. Since the types are not nullable, when the values are null, it usually ends up being represented as a zero value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225586820 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala --- @@ -314,24 +314,24 @@ class AnalysisSuite extends AnalysisTest with Matchers { } // non-primitive parameters do not need special null handling -val udf1 = ScalaUDF((s: String) => "x", StringType, string :: Nil) +val udf1 = ScalaUDF((s: String) => "x", StringType, string :: Nil, false :: Nil) val expected1 = udf1 checkUDF(udf1, expected1) // only primitive parameter needs special null handling val udf2 = ScalaUDF((s: String, d: Double) => "x", StringType, string :: double :: Nil, - nullableTypes = true :: false :: Nil) + false :: true :: Nil) val expected2 = - If(IsNull(double), nullResult, udf2.copy(children = string :: KnownNotNull(double) :: Nil)) + If(IsNull(double), nullResult, udf2.copy(handleNullForInputs = false :: false :: Nil)) --- End diff -- Yes, as explained in https://github.com/apache/spark/pull/22732#discussion_r225583730 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225585971 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2137,36 +2137,27 @@ class Analyzer( case p => p transformExpressionsUp { -case udf@ScalaUDF(func, _, inputs, _, _, _, _, nullableTypes) => - if (nullableTypes.isEmpty) { -// If no nullability info is available, do nothing. No fields will be specially -// checked for null in the plan. If nullability info is incorrect, the results -// of the UDF could be wrong. -udf - } else { -// Otherwise, add special handling of null for fields that can't accept null. -// The result of operations like this, when passed null, is generally to return null. -assert(nullableTypes.length == inputs.length) - -// TODO: skip null handling for not-nullable primitive inputs after we can completely -// trust the `nullable` information. -val needsNullCheck = (nullable: Boolean, expr: Expression) => - nullable && !expr.isInstanceOf[KnownNotNull] -val inputsNullCheck = nullableTypes.zip(inputs) - .filter { case (nullableType, expr) => needsNullCheck(!nullableType, expr) } - .map { case (_, expr) => IsNull(expr) } - .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2)) -// Once we add an `If` check above the udf, it is safe to mark those checked inputs -// as not nullable (i.e., wrap them with `KnownNotNull`), because the null-returning -// branch of `If` will be called if any of these checked inputs is null. Thus we can -// prevent this rule from being applied repeatedly. -val newInputs = nullableTypes.zip(inputs).map { case (nullable, expr) => - if (nullable) expr else KnownNotNull(expr) -} -inputsNullCheck - .map(If(_, Literal.create(null, udf.dataType), udf.copy(children = newInputs))) - .getOrElse(udf) - } +case udf @ ScalaUDF(_, _, inputs, handleNullForInputs, _, _, _, _) + if !handleNullForInputs.forall(!_) => --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225585740 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +40,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +handleNullForInputs: Seq[Boolean], --- End diff -- Among all other reasons, one argument for not merging the flags with the types is https://github.com/apache/spark/pull/22732#discussion_r225583730. As to your second question, it would be for same reason as above, plus https://github.com/apache/spark/pull/22259#discussion_r224252642. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225583730 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2137,36 +2137,27 @@ class Analyzer( case p => p transformExpressionsUp { -case udf@ScalaUDF(func, _, inputs, _, _, _, _, nullableTypes) => - if (nullableTypes.isEmpty) { -// If no nullability info is available, do nothing. No fields will be specially -// checked for null in the plan. If nullability info is incorrect, the results -// of the UDF could be wrong. -udf - } else { -// Otherwise, add special handling of null for fields that can't accept null. -// The result of operations like this, when passed null, is generally to return null. -assert(nullableTypes.length == inputs.length) - -// TODO: skip null handling for not-nullable primitive inputs after we can completely -// trust the `nullable` information. -val needsNullCheck = (nullable: Boolean, expr: Expression) => - nullable && !expr.isInstanceOf[KnownNotNull] -val inputsNullCheck = nullableTypes.zip(inputs) - .filter { case (nullableType, expr) => needsNullCheck(!nullableType, expr) } - .map { case (_, expr) => IsNull(expr) } - .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2)) -// Once we add an `If` check above the udf, it is safe to mark those checked inputs -// as not nullable (i.e., wrap them with `KnownNotNull`), because the null-returning -// branch of `If` will be called if any of these checked inputs is null. Thus we can -// prevent this rule from being applied repeatedly. -val newInputs = nullableTypes.zip(inputs).map { case (nullable, expr) => - if (nullable) expr else KnownNotNull(expr) -} -inputsNullCheck - .map(If(_, Literal.create(null, udf.dataType), udf.copy(children = newInputs))) - .getOrElse(udf) - } +case udf @ ScalaUDF(_, _, inputs, handleNullForInputs, _, _, _, _) + if !handleNullForInputs.forall(!_) => + // Otherwise, add special handling of null for fields that can't accept null. + // The result of operations like this, when passed null, is generally to return null. + assert(handleNullForInputs.length == inputs.length) + + // TODO: skip null handling for not-nullable primitive inputs after we can completely + // trust the `nullable` information. + val inputsNullCheck = handleNullForInputs.zip(inputs) --- End diff -- This should answer/confirm a couple of other questions above: Since we already have this flag `handleNullForInputs` in `ScalaUDF`, we can take advantage of it in this rule as well. Say, the first time a `ScalaUDF` hits this rule with `handleNullForInputs` as "false, true, false", then we add a null-handling (if clause) for the second input which is flagged with "true", and from this point on we are all good with all inputs, and we can flag the new `ScalaUDF`'s `handleNullForInputs` as all "false". So even if the new `ScalaUDF` hits this rule for a second time, nothing will be done. It should work the same way for the "TODO" above, if `handleNullForInputs` has a "true" flag and the corresponding expression is NOT nullable, we can skip the null handling while flagging it as "false" in the new `ScalaUDF` in the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225580591 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2137,36 +2137,27 @@ class Analyzer( case p => p transformExpressionsUp { -case udf@ScalaUDF(func, _, inputs, _, _, _, _, nullableTypes) => - if (nullableTypes.isEmpty) { -// If no nullability info is available, do nothing. No fields will be specially -// checked for null in the plan. If nullability info is incorrect, the results -// of the UDF could be wrong. -udf - } else { -// Otherwise, add special handling of null for fields that can't accept null. -// The result of operations like this, when passed null, is generally to return null. -assert(nullableTypes.length == inputs.length) - -// TODO: skip null handling for not-nullable primitive inputs after we can completely -// trust the `nullable` information. -val needsNullCheck = (nullable: Boolean, expr: Expression) => - nullable && !expr.isInstanceOf[KnownNotNull] -val inputsNullCheck = nullableTypes.zip(inputs) - .filter { case (nullableType, expr) => needsNullCheck(!nullableType, expr) } - .map { case (_, expr) => IsNull(expr) } - .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2)) -// Once we add an `If` check above the udf, it is safe to mark those checked inputs -// as not nullable (i.e., wrap them with `KnownNotNull`), because the null-returning -// branch of `If` will be called if any of these checked inputs is null. Thus we can -// prevent this rule from being applied repeatedly. -val newInputs = nullableTypes.zip(inputs).map { case (nullable, expr) => - if (nullable) expr else KnownNotNull(expr) -} -inputsNullCheck - .map(If(_, Literal.create(null, udf.dataType), udf.copy(children = newInputs))) - .getOrElse(udf) - } +case udf @ ScalaUDF(_, _, inputs, handleNullForInputs, _, _, _, _) + if !handleNullForInputs.forall(!_) => + // Otherwise, add special handling of null for fields that can't accept null. --- End diff -- I was wondering about that, too... Our scala style check is sometimes confusing. Let me double check then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22732 [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor signature ## What changes were proposed in this pull request? This is a follow-up PR for #22259. The extra field added in `ScalaUDF` with the original PR was declared optional, but should be indeed required, otherwise callers of `ScalaUDF`'s constructor could ignore this new field and cause the result to be incorrect. Meanwhile, now that we have this extra field indicating if a null-test should be applied on the corresponding input value, we can also make use of this flag to avoid the rule `HandleNullInputsForUDF` being applied infinitely. ## How was this patch tested? Passed affected existing UTs: AnalysisSuite UDFSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-25044-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22732 commit 5cf3c898978388dc9700219aa6e82c24d5f52f33 Author: maryannxue Date: 2018-10-15T18:36:00Z Adjust ScalaUDF constructor signature --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22706: [SPARK-25716][SQL][MINOR] remove unnecessary collection ...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22706 @srowen I don't think this would make a big difference performance-wise, but if it's the right change, it just looks cleaner now. Anyone have any idea why it wasn't like this before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22713: [SPARK-25691][SQL] Use semantic equality in AliasViewChi...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22713 We do need a test case here anyway. Ideally it would be just as simple as #22701 but the difficulty is in declaring a view. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22701: [SPARK-25690][SQL] Analyzer rule HandleNullInputs...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22701#discussion_r224658264 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2150,8 +2150,10 @@ class Analyzer( // TODO: skip null handling for not-nullable primitive inputs after we can completely // trust the `nullable` information. +val needsNullCheck = (nullable: Boolean, expr: Expression) => --- End diff -- Yes, that's because "nullableType" is flipped around here. "nullableType" should really be "cantBeNull" or "doesntNeedNullCheck". I'll change this in other PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22701: [SPARK-25690][SQL] Analyzer rule HandleNullInputs...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22701#discussion_r224602234 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala --- @@ -351,8 +351,8 @@ class AnalysisSuite extends AnalysisTest with Matchers { test("SPARK-24891 Fix HandleNullInputsForUDF rule") { val a = testRelation.output(0) val func = (x: Int, y: Int) => x + y -val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil) -val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil) +val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false :: false :: Nil) --- End diff -- It's two separate issues. If `nullableTypes` is not added here, the `HandleNullInputsForUDF` will do nothing, which means null checks will be missed. So it is itself a problem, which can be potentially triggered by a user. As to test, if the rule is not doing anything, the "doing something infinitely" bug cannot be reproduced. But the infinite issue is one on a theoretical level and is quite unlikely to have any end-user impact, thanks to @rxin's fix for SPARK-24865. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22259: [SPARK-25044][SQL] (take 2) Address translation o...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22259#discussion_r224527615 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -47,7 +48,8 @@ case class ScalaUDF( inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true) +udfDeterministic: Boolean = true, +nullableTypes: Seq[Boolean] = Nil) --- End diff -- Yes, I think this should be in 2.4, too, since it's an API change. BTW, I just finished https://github.com/apache/spark/pull/22701 addressing the suggested change 2 above. Then 1 and 3 can be covered in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22701: [SPARK-25690][SQL] Analyzer rule HandleNullInputs...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22701 [SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and can be applied infinitely ## What changes were proposed in this pull request? The HandleNullInputsForUDF rule can generate new If node infinitely, thus causing problems like match of SQL cache missed. This was fixed in SPARK-24891 and was then broken by SPARK-25044. The unit test in `AnalysisSuite` added in SPARK-24891 should have failed but didn't because it wasn't properly updated after the `ScalaUDF` constructor signature change. So this PR also updates the test accordingly based on the new `ScalaUDF` constructor. ## How was this patch tested? Updated the original UT. This should be justified as the original UT became invalid after SPARK-25044. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-25690 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22701.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22701 commit 736625b3f280dcd6caed83b3cec82b72d4f0c0fc Author: maryannxue Date: 2018-10-11T16:45:28Z [SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and can be applied infinitely --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22259: [SPARK-25044][SQL] (take 2) Address translation o...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22259#discussion_r224510115 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -47,7 +48,8 @@ case class ScalaUDF( inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true) +udfDeterministic: Boolean = true, +nullableTypes: Seq[Boolean] = Nil) --- End diff -- Yeah, understood and agree. We'd probably need to change 3 things in follow-up PR(s): 1. Add `isInstanceOf[KnownNotNull]` back (I can do this). 2. Move `nullableTypes` up in the parameter list and make it required. I agree that it'd only make things worse if anyone uses this private API and forget to set this parameter. 3. Flip the `true`/`false` around for `nullableTypes`. This is rather minor, but helps with readability and consistency I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22259: [SPARK-25044][SQL] (take 2) Address translation o...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22259#discussion_r224295469 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -47,7 +48,8 @@ case class ScalaUDF( inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true) +udfDeterministic: Boolean = true, +nullableTypes: Seq[Boolean] = Nil) --- End diff -- Yes, the test should not pass after removing `isInstanceOf[KnownNotNull]` condition from `needsNullCheck` test (https://github.com/apache/spark/pull/22259/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2L2160). The idea was to add a `KnownNotNull` node on top of the original node to mark it as null-checked, so the rule won't add redundant null checks even if it is accidentally applied again. I'm not sure about the exact reason why you removed `isInstanceOf[KnownNotNull]` condition in this PR, but I think it should be left there alongside the new nullable type check. After adding the `nullableTypes` parameter in the test, the issue can be reproduced: ``` test("SPARK-24891 Fix HandleNullInputsForUDF rule") { val a = testRelation.output(0) val func = (x: Int, y: Int) => x + y val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false :: false :: Nil) val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = false :: false :: Nil) val plan = Project(Alias(udf2, "")() :: Nil, testRelation) comparePlans(plan.analyze, plan.analyze.analyze) } ``` BTW, I'm just curious: It looks like `nullableTypes` indicates something opposite to "nullable" used in schema. I would assume when `nullableTypes` is `Seq(false)`, it means this type is not nullable and we need not add the null check, vice versa. Did I miss something here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22259: [SPARK-25044][SQL] (take 2) Address translation o...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22259#discussion_r224252642 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -47,7 +48,8 @@ case class ScalaUDF( inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true) +udfDeterministic: Boolean = true, +nullableTypes: Seq[Boolean] = Nil) --- End diff -- I think the problem is more about the way we handle `nullableTypes` if not specified as in https://github.com/apache/spark/pull/22259/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R2157. The test failure of https://github.com/apache/spark/pull/21851/files#diff-e8dddba2915a147970654aa93bee30a7R344 would have been exposed if the `nullableTypes` had been updated in this PR. So I would say logically this parameter is required, but right now it is declared optional. In this case, things went wrong when `nullableTypes` was left unspecified, and this could happen not only with tests but in "source" too. I suggest we move this parameter up right after `inputTypes` so it can get the attention it needs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22060 @maropu I'll follow up on this. I started the test again and I'll keep track of "which rules violate the assumption" and "which tests can reproduce the violation" in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22060 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22060 Sorry for the late reply. The purpose of this is to find out the rules that violate the once-policy assumption and also tests that can reproduce the issues. I think we should eventually turn this check on after we've fixed all those rules and extend this check to optimizer too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22519: [SPARK-25505][SQL] The output order of grouping c...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22519#discussion_r221090624 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -554,8 +554,11 @@ class Analyzer( Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) } // Group-by expressions coming from SQL are implicit and need to be deduced. -val groupByExprs = groupByExprsOpt.getOrElse( - (child.outputSet -- aggregates.flatMap(_.references) -- pivotColumn.references).toSeq) +val groupByExprs = groupByExprsOpt.getOrElse { + val pivotColAndAggRefs = +(pivotColumn.references ++ aggregates.flatMap(_.references)).toSet --- End diff -- Otherwise the result type would be Iterable and we wouldn't be able to call "contains". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22519: [SPARK-25505][SQL] The output order of grouping c...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22519#discussion_r219624150 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -554,8 +554,10 @@ class Analyzer( Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) } // Group-by expressions coming from SQL are implicit and need to be deduced. +val pivotColAndAggRefs = --- End diff -- Nice advice! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22519: [SPARK-25505][SQL] The output order of grouping c...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22519#discussion_r219624067 --- Diff: sql/core/src/test/resources/sql-tests/inputs/pivot.sql --- @@ -287,3 +287,13 @@ PIVOT ( sum(earnings) FOR (course, m) IN (('dotNET', map('1', 1)), ('Java', map('2', 2))) ); + +-- grouping columns output in the same order as input +SELECT * FROM ( + SELECT course, earnings, "a" as a, "z" as z, "b" as b, "y" as y, "c" as c, "x" as x, "d" as d, "w" as w --- End diff -- It wouldn't hurt anyway I think... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22519: [SPARK-25505][SQL] The output order of grouping c...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22519#discussion_r219623907 --- Diff: sql/core/src/test/resources/sql-tests/results/pivot.sql.out --- @@ -1,5 +1,5 @@ --- Automatically generated by SQLQueryTestSuite --- Number of queries: 31 +-- Automatically generated by SparkServiceSQLQueryTestSuite --- End diff -- Good catch. Do you have any idea how it has turned out this way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22519: [SPARK-25505][SQL] The output order of grouping c...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22519 [SPARK-25505][SQL] The output order of grouping columns in Pivot is different from the input order ## What changes were proposed in this pull request? The grouping columns from a Pivot query are inferred as "input columns - pivot columns - pivot aggregate columns", where input columns are the output of the child relation of Pivot. The grouping columns will be the leading columns in the pivot output and they should preserve the same order as specified by the input. For example, ``` SELECT * FROM ( SELECT course, earnings, "a" as a, "z" as z, "b" as b, "y" as y, "c" as c, "x" as x, "d" as d, "w" as w FROM courseSales ) PIVOT ( sum(earnings) FOR course IN ('dotNET', 'Java') ) ``` The output columns should be "a, z, b, y, c, x, d, w, ..." but now it is "a, b, c, d, w, x, y, z, ..." The fix is to use the child plan's `output` instead of `outputSet` so that the order can be preserved. ## How was this patch tested? Added UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-25505 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22519.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22519 commit bd416bd74ee77329b2527fffecd21f7f90090334 Author: maryannxue Date: 2018-09-21T14:33:16Z [SPARK-25505][SQL] The output order of grouping columns in Pivot is different from the input order --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22447: [SPARK-25450][SQL] PushProjectThroughUnion rule uses the...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22447 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22447: [SPARK-25450][SQL] PushProjectThroughUnion rule u...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22447 [SPARK-25450][SQL] PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation ## What changes were proposed in this pull request? The problem was cause by the PushProjectThroughUnion rule, which, when creating new Project for each child of Union, uses the same exprId for expressions of the same position. This is wrong because, for each child of Union, the expressions are all independent, and it can lead to a wrong result if other rules like FoldablePropagation kicks in, taking two different expressions as the same. This fix is to create new expressions in the new Project for each child of Union. ## How was this patch tested? Added UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark push-project-thru-union-bug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22447 commit 7193de3ad8675229eef131214ed62f2ece5cd416 Author: maryannxue Date: 2018-09-18T02:56:07Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22406: [SPARK-25415][SQL] Make plan change log in RuleEx...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22406 [SPARK-25415][SQL] Make plan change log in RuleExecutor configurable by SQLConf ## What changes were proposed in this pull request? In RuleExecutor, after applying a rule, if the plan has changed, the before and after plan will be logged using level "trace". At times, however, such information can be very helpful for debugging. Hence, making the log level configurable in SQLConf would allow users to turn on the plan change log independently and save the trouble of tweaking log4j settings. Meanwhile, filtering plan change log for specific rules can also be very useful. So this PR adds two SQL configurations: 1. spark.sql.optimizer.planChangeLog.level - set a specific log level for logging plan changes after a rule is applied. 2. spark.sql.optimizer.planChangeLog.rules - enable plan change logging only for a set of specified rules, separated by commas. ## How was this patch tested? Added UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-25415 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22406 commit 3b98a0f6bbcf6bd56341276f1cdb20e32a743faf Author: maryannxue Date: 2018-09-12T21:32:12Z [SPARK-25415][SQL] Make plan change log in RuleExecutor configurable by SQLConf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule check
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22060 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22060: [DO NOT MERGE][TEST ONLY] Add once-policy rule ch...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22060 [DO NOT MERGE][TEST ONLY] Add once-policy rule check ## What changes were proposed in this pull request? Rules like `HandleNullInputsForUDF` (https://issues.apache.org/jira/browse/SPARK-24891) do not stabilize (can apply new changes to a plan indefinitely) and can cause problems like SQL cache mismatching. Ideally, all rules whether in a once-policy batch or a fixed-point-policy batch should stabilize after the number of runs specified. Once-policy should be considered a performance improvement, a assumption that the rule can stabilize after just one run rather than an assumption that the rule won't be applied more than once. Those once-policy rules should be able to run fine with fixed-point policy rule as well. Currently we already have a check for fixed-point and throws an exception if maximum number of runs is reached and the plan is still changing. Here, in this PR, a similar check is added for once-policy and throws an exception if the plan changes between the first run and the second run of a once-policy rule. From this test result, we can find out which of the analysis rules break this check so we can fix them later. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark once_policy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22060 commit 323656872799b8dd636061220f3ed139379c9c79 Author: maryannxue Date: 2018-08-09T05:20:32Z Add once-policy batch check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22049: [SPARK-25063][SQL] Rename class KnowNotNull to Kn...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/22049 [SPARK-25063][SQL] Rename class KnowNotNull to KnownNotNull ## What changes were proposed in this pull request? Correct the class name typo checked in through SPARK-24891 ## How was this patch tested? Passed all existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark known-not-null Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22049.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22049 commit 629d18a1fb1656e7c8fca23827858d9883217d17 Author: maryannxue Date: 2018-08-08T22:59:45Z Rename KnowNotNull to KnownNotNull --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22049: [SPARK-25063][SQL] Rename class KnowNotNull to KnownNotN...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/22049 @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208468677 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- I think @MaxGekk's intention was to keep the old signature as it is but somehow used "lit" which takes `Column` too. Correct me if I'm wrong, @MaxGekk. So back to the choice between `pivot(Column, Seq[Column])` and `pivot(Column, Seq[Any])`, I think having an explicit `Seq[Column]` type is less confusing and kind of tells people by itself that we are now support complex types in pivot values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208466779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- > This is going to allow pivot(String, Seq[Any]) also take Column I think using "lit" here is causing the confusion then (perhaps @MaxGekk was not aware of that?). We should keep the current behavior of this signature as it is. Using `Column(Literal.create(value))` would do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208460101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > The previous interface pivot(Column, Seq[Any]) has existed for more then multiple years. Is this based on actual feedback from users or your speculation?\ This is what @MaxGekk added in https://github.com/apache/spark/pull/21699. > This assumption of yours is not true. See my reply to your comment below. No. Seq[Any] takes literal values (objects); Seq[Column] takes `Column` expressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208459011 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- > pivot(String, Seq[Any]) takes values and columns too (#22030 (comment), I guess). How about we have pivot(Column, Seq[Any]) takes values and columns too? This assumption of yours is not true. See my reply to your comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208458861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- Yes, with Seq[Any] we only allow literal values, not `Column`s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208458789 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- The downside is if have both pivot(Column, Seq[Any]) and pivot(Column, Seq[Column]), we would end up having too many versions of pivot and that would be sth confusing I'm afraid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208453178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- The very fundamental interface we should have is `pivot(Column, Seq[Column])`, which allows any form and any type of pivot column, and the same with pivot values. This is close to what we support in SQL (SQL pivot support will actually be a subset of DataFrame pivot support after we have this interface), and verifying that the pivot values are constant is taken care of in the Analyzer. That said, we still need to keep the old `pivot(String, Seq[Any])` for simple usages and for backward compatibility, but I don't think we need to expand its capability. It is pretty clear to me that pivot(String ...) takes a column name and simple objects while with pivot(Column...) you could make any sophisticated use of pivot you would like to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208451663 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -335,7 +337,7 @@ class RelationalGroupedDataset protected[sql]( * @since 1.6.0 */ def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { -pivot(Column(pivotColumn), values) +pivot(Column(pivotColumn), values.map(lit)) --- End diff -- Yes, you did. This "old" interface only takes in a single named column (say, "a", but not "a+1") by its name, but we turn it into a `Column` just to reuse the same implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208423936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -403,20 +415,29 @@ class RelationalGroupedDataset protected[sql]( * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") + * df.groupBy($"year").pivot($"course", Seq(lit("dotNET"), lit("Java"))).sum($"earnings") + * }}} + * + * For pivoting by multiple columns, use the `struct` function to combine the columns and values: + * + * {{{ + * df + * .groupBy($"year") + * .pivot(struct($"course", $"training"), Seq(struct(lit("java"), lit("Experts" + * .agg(sum($"earnings")) * }}} * * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. * @since 2.4.0 */ - def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Column]): RelationalGroupedDataset = { --- End diff -- @HyukjinKwon You can just consider `pivot(String, Seq[Any])` as a simplified version of `pivot(Column, Seq[Column]) for users who don't care to use multiple pivot columns or a pivot column of complex types. Given that now we have the full-functional version and the simple version here, I don't think adding another signature is necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208411022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql]( .sort(pivotColumn) // ensure that the output columns are in a consistent logical order .collect() .map(_.get(0)) + .collect { --- End diff -- Use "map"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208410422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql]( .sort(pivotColumn) // ensure that the output columns are in a consistent logical order .collect() .map(_.get(0)) + .collect { +case row: GenericRow => struct(row.values.map(lit): _*) --- End diff -- I suspect this will not work for nested struct types, or say, multiple pivot columns with nested type. Could you please add a test like: ``` test("pivoting column list") { val expected = ... val df = trainingSales .groupBy($"sales.year") .pivot(struct($"sales", $"training")) .agg(sum($"sales.earnings")) checkAnswer(df, expected) } ``` And can we also check if it works for other complex nested types, like Array(Struct(...))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 > Actually I am mostly worry of the pivotColumn. Specifying multiple columns via struct is not intuitive I believe. It depends on whether we'd like to add extra interfaces for multiple columns. I don't have a preference between reusing this interface for multiple pivot columns or adding new ones. And we can always decide later. But back to this interface, I'd assume this is for more advanced users, and the pivot column, even just being a single column, can have complex types, so the "literal object" values might be insufficient. Plus, this is a new interface we haven't pushed out yet, but once we have, we are more likely to end up adding a new one than changing it if we want to make it more sophisticated later on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 Thank you for the change, @MaxGekk! @HyukjinKwon my idea was actually that the overloaded versions of pivot would be `pivot(column: Column, values, Seq[Column])`, so that we can construct different types in "values". The constant check will be done in Analyzer, so we don't need to worry about it here. Ultimately we would like to support complex-typed values in `pivot(column: Column)` as well, but I think we can make this in a different PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 @MaxGekk LGTM, but one more thing to consider: Since we support column list in SQL, it would be nice to support it and test it in DataFrame pivot too. The only thing that we need to enable is to make pivot values `Expression`s instead of `Literal`s, coz `Literal`s do not include struct-type literals, e.g., `struct(1, 2)`. The `Pivot` node already has pivot values as `Seq[Expression]`, so all left to be done is in the DataFrame interfaces. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21926: [SPARK-24972][SQL] PivotFirst could not handle pivot col...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21926 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 @MaxGekk Please take a look at https://github.com/apache/spark/pull/21926. There was a bug in PivotFirst and this PR should fix your test here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21926#discussion_r206354004 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -574,10 +578,14 @@ class Analyzer( // Since evaluating |pivotValues| if statements for each input row can get slow this is an // alternate plan that instead uses two steps of aggregation. val namedAggExps: Seq[NamedExpression] = aggregates.map(a => Alias(a, a.sql)()) - val bigGroup = groupByExprs ++ pivotColumn.references + val namedPivotCol = pivotColumn match { --- End diff -- This is to revert the original walk-around aimed to avoid the PivotFirst issue. Now that we have PivotFirst working alright for complex types, we can revert it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21926: [SPARK-24972][SQL] PivotFirst could not handle pi...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/21926 [SPARK-24972][SQL] PivotFirst could not handle pivot columns of complex types ## What changes were proposed in this pull request? When the pivot column is of a complex type, the eval() result will be an UnsafeRow, while the keys of the HashMap for column value matching is a GenericInternalRow. As a result, there will be no match and the result will always be empty. So for a pivot column of complex-types, we should: 1) If the complex-type is not comparable (orderable), throw an Exception. It cannot be a pivot column. 2) Otherwise, if it goes through the `PivotFirst` code path, `PivotFirst` should use a TreeMap instead of HashMap for such columns. ## How was this patch tested? Added UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark pivot_followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21926 commit b41a45cb22bd3d49e75711950bcbc3d409bc544a Author: maryannxue Date: 2018-07-30T23:15:40Z [SPARK-24972][SQL] PivotFirst could not handle pivot columns of complex types --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 @MaxGekk Yes, it was caused by my previous PR. The change in my PR was a walk-around for an existing problem in either Aggregate or PivotFirst (I suspect it's Aggregate) with struct-type columns. The change itself worked as designed because Pivot SQL support wouldn't allow any function (like "lowercase") in the pivot column. However it broke your PR coz it aimed to allow any expression. That said, we have two options here: 1) Give up the PivotFirst approach and fall back to "else" branch for struct-type pivot columns, i.e., multiple column in pivot FOR clause. 2) Fix the bug for Aggregate or PivotFirst. I will do a little investigation into option 2) tomorrow and get back to you :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21875: [SPARK-24288][SQL] Add a JDBC Option to enable preventin...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21875 Programming guide updated. Thank you, @dilipbiswal and @HyukjinKwon! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21876: [SPARK-24802][SQL][FOLLOW-UP] Add a new config fo...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/21876 [SPARK-24802][SQL][FOLLOW-UP] Add a new config for Optimization Rule Exclusion ## What changes were proposed in this pull request? This is an extension to the original PR, in which rule exclusion did not work for classes derived from Optimizer, e.g., SparkOptimizer. To solve this issue, Optimizer and its derived classes will define/override `defaultBatches` and `nonExcludableRules` in order to define its default rule set as well as rules that cannot be excluded by the SQL config. In the meantime, Optimizer's `batches` method is dedicated to the rule exclusion logic and is defined "final". ## How was this patch tested? Added UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark rule-exclusion Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21876 commit eaec2f5f2b4e3193de41655b84a1dc936b0e50a3 Author: maryannxue Date: 2018-07-13T21:32:01Z [SPARK-24802] Optimization Rule Exclusion commit 84f1a6b5cba08df8684179e9d7195545be655e76 Author: maryannxue Date: 2018-07-18T06:13:50Z Address review comments commit ff23edf81a4a78d1589ed582a1802b94a8ebf4c6 Author: maryannxue Date: 2018-07-21T02:37:54Z Address review comments commit b154979236e211dc7185ca8e450493f0c6b0f469 Author: maryannxue Date: 2018-07-21T02:41:21Z change test name commit 87afe4fbcaf71d303b07612f9ceb9ad25dd3dcda Author: maryannxue Date: 2018-07-23T00:35:01Z address review comments commit 39b6ce9548c99363e81cb246b4cbe5534d710f3e Author: maryannxue Date: 2018-07-23T04:28:00Z address review comments commit a2161ef1f333f2cc039df0ecc8c96e5ec27e00ff Author: maryannxue Date: 2018-07-25T18:52:13Z Merge remote-tracking branch 'origin/master' into rule-exclusion commit 3730053d7386188042b2f2d4bd6784c3de722df6 Author: maryannxue Date: 2018-07-25T20:08:19Z Extend rule-exclusion to Optimizer sub-classes, esp. SparkOptimizer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21875: [SPARK-24288][SQL] Add a JDBC Option to enable pr...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21875#discussion_r205268701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -172,7 +172,11 @@ private[sql] case class JDBCRelation( // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { -filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty) +if (jdbcOptions.pushDownPredicate) { --- End diff -- No. I share your opinion actually. It is confusing here... maybe we should change the parameter names at some point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21875: [SPARK-24288][SQL] Add a JDBC Option to enable pr...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21875#discussion_r205267327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -183,6 +183,9 @@ class JDBCOptions( } // An option to execute custom SQL before fetching data from the remote DB val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT) + + // An option to allow/disallow pushing down predicate into JDBC data source + val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean --- End diff -- Or one could argue that "predicate" is a notion of all filters as a whole. It's a nice reminder though. I had not thought about it, but anyway I just checked: we use `PushDownPredicate` and the singular form in similar rules. So maybe we keep it singular here too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21875: [SPARK-24288][SQL] Add a JDBC Option to enable pr...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21875#discussion_r205266067 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -172,7 +172,11 @@ private[sql] case class JDBCRelation( // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { -filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty) +if (jdbcOptions.pushDownPredicate) { --- End diff -- Yes, this is the only source of truth for defining handled/unhandled. The caller calls this method and push "handled" to scanTable, in this case JDBCRDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21403 @mgaido91 I see. But by using Seq[Expression] in `In`, can we hopefully remove `ResolveInValues`. I wouldn't mind changing the parser if it's necessary and if it saves work elsewhere. Having such a temporary expression which doesn't mean anything more than a wrapper of Seq[Expression] doesn't look like a very clean approach to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21875: [SPARK-24288][SQL] Enable preventing predicate pushdown
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21875 @gatorsmile @TomaszGaweda --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21875: [SPARK-24288][SQL] Enable preventing predicate pu...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/21875 [SPARK-24288][SQL] Enable preventing predicate pushdown ## What changes were proposed in this pull request? Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow predicate push-down in JDBC data source. ## How was this patch tested? Add a test in `JDBCSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-24288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21875 commit a83b64b53aa043d4b1cc9b5572c3676ec168027c Author: maryannxue Date: 2018-07-25T18:36:53Z [SPARK-24288][SQL] Enable preventing predicate pushdown --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21360: [SPARK-24288] Enable preventing predicate pushdow...
Github user maryannxue closed the pull request at: https://github.com/apache/spark/pull/21360 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21821 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21403 I think the behavior definition is good and clear. But just a question on the implementation: is it necessary to introduce a new class `InValues`? or we could simply make `In` has it's first child "value" as `Seq[Expression]` type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21851: [SPARK-24891][SQL] Fix HandleNullInputsForUDF rul...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/21851 [SPARK-24891][SQL] Fix HandleNullInputsForUDF rule ## What changes were proposed in this pull request? The HandleNullInputsForUDF would always add a new `If` node every time it is applied. That would cause a difference between the same plan being analyzed once and being analyzed twice (or more), thus raising issues like plan not matched in the cache manager. The solution is to mark the arguments as null-checked, which is to add a "AssertNotNull" node above those arguments, when adding the UDF under an `If` node, because clearly the UDF will not be called when any of those arguments is null. ## How was this patch tested? Add new tests under sql/UDFSuite and AnalysisSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-24891 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21851 commit 62fa9cf99610d8fa67d123450f2721cac0b5899f Author: maryannxue Date: 2018-07-23T18:56:05Z [SPARK-24891][SQL] Fix HandleNullInputsForUDF rule --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21821 I just ran a test with once-strategy check and found out that a few batches/rules do not stop, e.g. AggregatePushDown, "Convert to Spark client exec", PartitionPruning. I believe most of them are edge rules and none of them are analyzer rules. Still, let's keep this fix until just to be on the safe side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802][SQL] Add a new config for Optimizat...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r204279843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -175,6 +191,41 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Override to provide additional rules for the operator optimization batch. */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + override def batches: Seq[Batch] = { +val excludedRulesConf = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(_.nonEmpty)) --- End diff -- No reason. It's just I didn't know about it. Thank you for point this out! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21821 Yes, @gatorsmile. Code is ready. Will post a PR shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r203731087 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -127,6 +127,14 @@ object SQLConf { } } + val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules") --- End diff -- +1 on debugging purpose. Still, CacheManager matches the *analyzed* plan not the optimized plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r203730778 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_EXCLUDED_RULES + + +class OptimizerRuleExclusionSuite extends PlanTest { --- End diff -- Added :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r203730652 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -160,6 +160,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) UpdateNullabilityInAttributeReferences) } + def nonExcludableBatches: Seq[String] = +"Eliminate Distinct" :: + "Finish Analysis" :: + "Replace Operators" :: + "Pullup Correlated Expressions" :: + "RewriteSubquery" :: Nil --- End diff -- I'll change to rule black list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r203730125 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -175,6 +182,44 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Override to provide additional rules for the operator optimization batch. */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + override def batches: Seq[Batch] = { +val excludedRules = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(!_.isEmpty)) --- End diff -- There is an auto-generated field `ruleName` in `Rule`, so we do exact name matching (case sensitive). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21720: [SPARK-24163][SPARK-24164][SQL] Support column list as t...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21720 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21720: [SPARK-24163][SPARK-24164][SQL] Support column list as t...
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21720 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r202786530 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -127,6 +127,14 @@ object SQLConf { } } + val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules") --- End diff -- Are you talking about SQL cache? I don't think optimizer has anything to do with SQL cache though, since the logical plans used to match cache entries are "analyzed" plans not "optimized" plans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r202762054 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -127,6 +127,14 @@ object SQLConf { } } + val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules") +.doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " + + "specified by their rule names and separated by comma. It is not guaranteed that all the " + + "rules in this configuration will eventually be excluded, as some rules are necessary " + --- End diff -- Nice suggestion! @gatorsmile's other suggestion was to introduce a blacklist, in which case this enumeration of rules that cannot be excluded can be made possible. I can do a warning as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r202760924 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -175,6 +179,35 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Override to provide additional rules for the operator optimization batch. */ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + + override def batches: Seq[Batch] = { +val excludedRules = + SQLConf.get.optimizerExcludedRules.toSeq.flatMap(_.split(",").map(_.trim).filter(!_.isEmpty)) +val filteredOptimizationBatches = if (excludedRules.isEmpty) { + optimizationBatches +} else { + optimizationBatches.flatMap { batch => +val filteredRules = + batch.rules.filter { rule => +val exclude = excludedRules.contains(rule.ruleName) +if (exclude) { + logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") +} +!exclude + } +if (batch.rules == filteredRules) { --- End diff -- It is to: 1) avoid unnecessary object creation if all rules have been preserved. 2) avoid empty batches if all rules in the batch have been removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21764: [SPARK-24802] Optimization Rule Exclusion
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/21764#discussion_r202759884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -46,7 +47,23 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) - def batches: Seq[Batch] = { + protected def postAnalysisBatches: Seq[Batch] = { +Batch("Eliminate Distinct", Once, EliminateDistinct) :: +// Technically some of the rules in Finish Analysis are not optimizer rules and belong more +// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). +// However, because we also use the analyzer to canonicalized queries (for view definition), +// we do not eliminate subqueries or compute current time in the analyzer. +Batch("Finish Analysis", Once, + EliminateSubqueryAliases, + EliminateView, + ReplaceExpressions, + ComputeCurrentTime, + GetCurrentDatabase(sessionCatalog), + RewriteDistinctAggregates, + ReplaceDeduplicateWithAggregate) :: Nil + } + + protected def optimizationBatches: Seq[Batch] = { --- End diff -- So can I do black list of batches? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org