[GitHub] spark issue #22898: [SPARK-25746][SQL][followup] do not add unnecessary If e...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22898 cc @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22898: [SPARK-25746][SQL][followup] do not add unnecessa...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22898 [SPARK-25746][SQL][followup] do not add unnecessary If expression ## What changes were proposed in this pull request? a followup of https://github.com/apache/spark/pull/22749. When we construct the new serializer in `ExpressionEncoder.tuple`, we don't need to add `if(isnull ...)` check for each field. They are either simple expressions that can propagate null correctly(e.g. `GetStructField(GetColumnByOrdinal(0, schema), index)`), or complex expression that already have the isnull check. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22898.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22898 commit 82664439318b72d8446230515abb882b89767bb9 Author: Wenchen Fan Date: 2018-10-31T05:44:44Z do not add unnecessary If expression --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21860 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22713: [SPARK-25691][SQL] Use semantic equality in AliasViewChi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22713 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r229538148 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The threshold of source code length without comment of a single Java function by " + + "codegen to be split. When the generated Java function source code exceeds this threshold" + + ", it will be split into multiple small functions. We cannot know how many bytecode will " + + "be generated, so use the code length as metric. When running on HotSpot, a function's " + + "bytecode should not go beyond 8KB, otherwise it will not be JITted; it also should not " + + "be too small, otherwise there will be many function calls.") +.intConf +.createWithDefault(1024) --- End diff -- let's add a check value to make sure the value is positive. We can figure out a lower and upper bound later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r229538040 --- Diff: sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out --- @@ -161,7 +161,7 @@ SELECT db1.t1.i1 FROM t1, mydb2.t1 struct<> -- !query 18 output org.apache.spark.sql.AnalysisException -cannot resolve '`db1.t1.i1`' given input columns: [mydb2.t1.i1, mydb2.t1.i1]; line 1 pos 7 +cannot resolve '`db1`.`t1`.`i1`' given input columns: [mydb2.t1.i1, mydb2.t1.i1]; line 1 pos 7 --- End diff -- LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22857: [SPARK-25860][SQL] Replace Literal(null, _) with FalseLi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22857 LGTM except the end-to-end test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229537395 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2585,4 +2585,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { + case s: LocalTableScanExec => assert(s.rows.isEmpty) + case p => fail(s"$p is not LocalTableScanExec") +} + +val df1 = Seq((1, true), (2, false)).toDF("l", "b") +val df2 = Seq(2, 3).toDF("l") + +val q1 = df1.where("IF(l > 10, false, b AND null)") +checkAnswer(q1, Seq.empty) +checkPlanIsEmptyLocalScan(q1) + +val q2 = df1.where("CASE WHEN l < 10 THEN null WHEN l > 40 THEN false ELSE null END") +checkAnswer(q2, Seq.empty) +checkPlanIsEmptyLocalScan(q2) + +val q3 = df1.join(df2, when(df1("l") > df2("l"), lit(null)).otherwise(df1("b") && lit(null))) +checkAnswer(q3, Seq.empty) +checkPlanIsEmptyLocalScan(q3) + +val q4 = df1.where("IF(IF(b, null, false), true, null)") +checkAnswer(q4, Seq.empty) +checkPlanIsEmptyLocalScan(q4) + +val q5 = df1.selectExpr("IF(l > 1 AND null, 5, 1) AS out") +checkAnswer(q5, Row(1) :: Row(1) :: Nil) +q5.queryExecution.executedPlan.foreach { p => + assert(p.expressions.forall(e => e.find(_.isInstanceOf[If]).isEmpty)) --- End diff -- This test can pass without the optimization. The `ConvertToLocalRelation` rule will eliminate the `Project`. Can we use a table as input data? e.g. ``` withTable("t1", "t2") { Seq((1, true), (2, false)).toDF("l", "b").write.saveAsTable("t1") Seq(2, 3).toDF("l").write.saveAsTable("t2") val df1 = spark.table("t1") val df2 = spark.table("t2") ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229537117 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2585,4 +2585,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { --- End diff -- it's weird to put optimizer end-to-end test in `DataFrameSuite`. Can we create a `ReplaceNullWithFalseEndToEndSuite`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22713: [SPARK-25691][SQL] Use semantic equality in AliasViewChi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22713 LGTM. To confirm, this is a potential bug, currently end-users can't hit it, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Alias...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22713#discussion_r229307114 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala --- @@ -604,4 +606,28 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkAnalysis(input, expected) } } + + test("SPARK-25691: AliasViewChild with different nullabilities") { +object ViewAnalyzer extends RuleExecutor[LogicalPlan] { + val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil +} +def intNotNullableAttr(name: String): Attribute = { + AttributeReference(name, IntegerType, nullable = false)() +} +val relation = LocalRelation(intNotNullableAttr("a"), 'b.string) --- End diff -- nit: `'a.int.notNull` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22885: [BUILD][MINOR] release script should not interrupt by sv...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22885 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r229283365 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -854,33 +862,50 @@ case class HashAggregateExec( val updateRowInHashMap: String = { if (isFastHashMapEnabled) { -ctx.INPUT_ROW = fastRowBuffer -val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => - val dt = updateExpr(i).dataType - CodeGenerator.updateColumn( -fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) -} +if (isVectorizedHashMapEnabled) { + ctx.INPUT_ROW = fastRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => +val dt = updateExpr(i).dataType +CodeGenerator.updateColumn( + fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized = true) + } -// If fast hash map is on, we first generate code to update row in fast hash map, if the -// previous loop up hit fast hash map. Otherwise, update row in regular hash map. -s""" - |if ($fastRowBuffer != null) { - | // common sub-expressions - | $effectiveCodes - | // evaluate aggregate function - | ${evaluateVariables(fastRowEvals)} - | // update fast row - | ${updateFastRow.mkString("\n").trim} - |} else { - | $updateRowInRegularHashMap - |} - """.stripMargin + // If vectorized fast hash map is on, we first generate code to update row + // in vectorized fast hash map, if the previous loop up hit vectorized fast hash map. + // Otherwise, update row in regular hash map. + s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin +} else { + // If row-based hash map is on and the previous loop up hit fast hash map, + // we reuse regular hash buffer to update row of fast hash map. + // Otherwise, update row in regular hash map. + s""" + |// Updates the proper row buffer + |UnsafeRow $updatedAggBuffer = null; --- End diff -- OK now I understand what's going on here. I still think we don't need this variable. We can generate ``` if ($fastRowBuffer != null) { $unsafeRowBuffer = $fastRowBuffer } $updateRowInRegularHashMap ``` And then we don't need to change `updateRowInRegularHashMap`. Note that, the readability of the Scala code is more important than the readability of the generated java code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22755 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22892: [SPARK-25884][SQL] Add TBLPROPERTIES and COMMENT,...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22892#discussion_r229274492 --- Diff: sql/core/src/test/resources/sample.json --- @@ -0,0 +1,2 @@ +{"a" : "2" ,"b" : "blah", "c_!@(3)":1} --- End diff -- why add this file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22755 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22755 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22885: [BUILD][MINOR] release script should not interrup...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22885 [BUILD][MINOR] release script should not interrupt by svn ## What changes were proposed in this pull request? When running the release script, you will be interrupted unexpectedly ``` ATTENTION! Your password for authentication realm: <https://dist.apache.org:443> ASF Committers can only be stored to disk unencrypted! You are advised to configure your system so that Subversion can store passwords encrypted, if possible. See the documentation for details. You can avoid future appearances of this warning by setting the value of the 'store-plaintext-passwords' option to either 'yes' or 'no' in '/home/spark-rm/.subversion/servers'. --- Store password unencrypted (yes/no)? ``` We can avoid it by adding `--no-auth-cache` when running svn command. ## How was this patch tested? manually verified with 2.4.0 RC5 You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark svn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22885.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22885 commit f7c63d52948c3f7c6bc6aa8ebed5ce8684403101 Author: Wenchen Fan Date: 2018-10-19T14:16:42Z release script should not interrupt --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22885: [BUILD][MINOR] release script should not interrupt by sv...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22885 cc @vanzin @srowen @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22755: [SPARK-25755][SQL][Test] Supplementation of non-C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22755#discussion_r229155463 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -669,22 +669,20 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { } } - Seq(true, false).foreach { codegen => + withWholeStageCodegenOnAndOff { codegenEnabled => test("SPARK-22951: dropDuplicates on empty dataFrames should produce correct aggregate " + - s"results when codegen is enabled: $codegen") { - withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, codegen.toString)) { -// explicit global aggregations -val emptyAgg = Map.empty[String, String] -checkAnswer(spark.emptyDataFrame.agg(emptyAgg), Seq(Row())) -checkAnswer(spark.emptyDataFrame.groupBy().agg(emptyAgg), Seq(Row())) -checkAnswer(spark.emptyDataFrame.groupBy().agg(count("*")), Seq(Row(0))) -checkAnswer(spark.emptyDataFrame.dropDuplicates().agg(emptyAgg), Seq(Row())) - checkAnswer(spark.emptyDataFrame.dropDuplicates().groupBy().agg(emptyAgg), Seq(Row())) - checkAnswer(spark.emptyDataFrame.dropDuplicates().groupBy().agg(count("*")), Seq(Row(0))) - -// global aggregation is converted to grouping aggregation: -assert(spark.emptyDataFrame.dropDuplicates().count() == 0) - } + s"results when codegen is enabled: $codegenEnabled") { --- End diff -- we can add the test name postfix in `withWholeStageCodegenOnAndOff`, so that the caller side only need to provide a base name. e.g. ``` def withWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { Seq("false", "true").foreach { enabled => test(s"$testName (whole-stage-codegen ${if enabled "on" else "off"})") ... } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22755: [SPARK-25755][SQL][Test] Supplementation of non-C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22755#discussion_r229154981 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -459,6 +459,7 @@ private[spark] class Executor( threadMXBean.getCurrentThreadCpuTime } else 0L + --- End diff -- unnecessary change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r229153592 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -7,3 +7,11 @@ select from_csv('1', 'a InvalidType'); select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')); select from_csv('1', 'a INT', map('mode', 1)); select from_csv(); +-- infer schema of json literal +select from_csv('1,abc', schema_of_csv('1,abc')); +select schema_of_csv('1|abc', map('delimiter', '|')); +select schema_of_csv(null); +CREATE TEMPORARY VIEW csvTable(csvField, a) AS SELECT * FROM VALUES ('1,abc', 'a'); +SELECT schema_of_csv(csvField) FROM csvTable; +-- Clean up +DROP VIEW IF EXISTS csvTable; --- End diff -- yea we need to clean up tables, as they are permanent. Actually I'm fine with it, as we clean up temp views in a lot of golden files. We can have another PR to remove these temp view clean up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229151278 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,60 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * This rule applies to conditions in [[Filter]] and [[Join]]. Moreover, it transforms predicates + * in all [[If]] expressions as well as branch conditions in all [[CaseWhen]] expressions. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As this rule is not limited to conditions in [[Filter]] and [[Join]], arbitrary plans can + * benefit from it. For example, `Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))` + * can be simplified into `Project(Literal(2))`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case cw @ CaseWhen(branches, _) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +cw.copy(branches = newBranches) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. --- End diff -- Can we make it more general? I think the expected expression is: 1. It's `NullIntolerant`. If any child is null, it will be null. 2. it has a null child. so I would write something like ``` case f @ Filter(cond, _) if alwaysNull(cond) => f.copy(condition = false) ... def alwaysNull(e: Expression): Boolean = e match { case Literal(null, _) => true case n: NullIntolerant => n.children.exists(alwaysNull) case _ => false } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229150341 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case And(left, right) => --- End diff -- I don't have a particular case, this is just to double check that these corner cases are considered. I think we are fine now :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r229150101 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { --- End diff -- yea we have. Take a look at `TestHive`, and we did something similar before ``` // Disable ConvertToLocalRelation for better test coverage. Test cases built on // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22309: [SPARK-20384][SQL] Support value class in schema of Data...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22309 My only concern is, the value class handling is kind of spread out in `ScalaReflection`. Maybe we need a better abstraction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228925856 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -128,6 +128,15 @@ object ScalaReflection extends ScalaReflection { case _ => false } + def isValueClass(tpe: `Type`): Boolean = { +tpe.typeSymbol.asClass.isDerivedValueClass + } + + /** Returns the name and type of the underlying parameter of value class `tpe`. */ + def getUnderlyingParameterOf(tpe: `Type`): (String, Type) = { +getConstructorParameters(tpe).head --- End diff -- is there a more official way to get the value class field name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22872: [SPARK-25864][SQL][TEST] Make main args accessible for B...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22872 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228866822 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -180,11 +189,13 @@ object ScalaReflection extends ScalaReflection { * @param tpe The `Type` of deserialized object. * @param path The expression which can be used to extract serialized value. * @param walkedTypePath The paths from top to bottom to access current field when deserializing. + * @param instantiateValueClass If `true`, create an instance for Scala value class --- End diff -- it will be good to explain when we need to instantiate value class and why --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r228825355 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { --- End diff -- I'm OK with that, since we do have 2 different ways to do Hive CTAS. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22872: [SPARK-25864][SQL][TEST] Make main args accessible for B...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22872 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22872: [SPARK-25864][SQL][TEST] Make main args accessible for B...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22872 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r228816243 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { --- End diff -- then how about we create a special Hive CTAS command that follows data source CTAS command but creates Hive table? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFilter.en...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22865 thanks, merging to master/2.4/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r228787018 --- Diff: sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql --- @@ -7,3 +7,11 @@ select from_csv('1', 'a InvalidType'); select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')); select from_csv('1', 'a INT', map('mode', 1)); select from_csv(); +-- infer schema of json literal +select from_csv('1,abc', schema_of_csv('1,abc')); +select schema_of_csv('1|abc', map('delimiter', '|')); +select schema_of_csv(null); +CREATE TEMPORARY VIEW csvTable(csvField, a) AS SELECT * FROM VALUES ('1,abc', 'a'); +SELECT schema_of_csv(csvField) FROM csvTable; +-- Clean up +DROP VIEW IF EXISTS csvTable; --- End diff -- actually we don't need to clean up temp views. The golden file test is run with a fresh session. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r228786427 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala --- @@ -19,14 +19,39 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.ArrayBasedMapData -import org.apache.spark.sql.types.{MapType, StringType, StructType} +import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType} +import org.apache.spark.unsafe.types.UTF8String object ExprUtils { - def evalSchemaExpr(exp: Expression): StructType = exp match { -case Literal(s, StringType) => StructType.fromDDL(s.toString) + def evalSchemaExpr(exp: Expression): StructType = { +// Use `DataType.fromDDL` since the type string can be struct<...>. +val dataType = exp match { + case Literal(s, StringType) => +DataType.fromDDL(s.toString) + case e @ SchemaOfCsv(_: Literal, _) => +val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] +DataType.fromDDL(ddlSchema.toString) + case e => throw new AnalysisException( +"Schema should be specified in DDL format as a string literal or output of " + + s"the schema_of_csv function instead of ${e.sql}") +} + +if (!dataType.isInstanceOf[StructType]) { + throw new AnalysisException( +s"Schema should be struct type but got ${dataType.sql}.") +} +dataType.asInstanceOf[StructType] + } + + def evalTypeExpr(exp: Expression): DataType = exp match { +case Literal(s, StringType) => DataType.fromDDL(s.toString) --- End diff -- we also need to update https://github.com/apache/spark/pull/22666/files#diff-5321c01e95bffc4413c5f3457696213eR157 in case the constant folding rule is disabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22666: [SPARK-25672][SQL] schema_of_csv() - schema infer...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22666#discussion_r228785835 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala --- @@ -19,14 +19,39 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.ArrayBasedMapData -import org.apache.spark.sql.types.{MapType, StringType, StructType} +import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType} +import org.apache.spark.unsafe.types.UTF8String object ExprUtils { - def evalSchemaExpr(exp: Expression): StructType = exp match { -case Literal(s, StringType) => StructType.fromDDL(s.toString) + def evalSchemaExpr(exp: Expression): StructType = { +// Use `DataType.fromDDL` since the type string can be struct<...>. +val dataType = exp match { + case Literal(s, StringType) => +DataType.fromDDL(s.toString) + case e @ SchemaOfCsv(_: Literal, _) => +val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String] +DataType.fromDDL(ddlSchema.toString) + case e => throw new AnalysisException( +"Schema should be specified in DDL format as a string literal or output of " + + s"the schema_of_csv function instead of ${e.sql}") +} + +if (!dataType.isInstanceOf[StructType]) { + throw new AnalysisException( +s"Schema should be struct type but got ${dataType.sql}.") +} +dataType.asInstanceOf[StructType] + } + + def evalTypeExpr(exp: Expression): DataType = exp match { +case Literal(s, StringType) => DataType.fromDDL(s.toString) --- End diff -- how about ``` if (expr.isFoldable && expr.dataType == StringType) { DataType.fromDDL(expr.eval().asInstanceOf[UTF8String].toString) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228784274 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -373,6 +383,32 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil) + case t if isValueClass(t) => +val (_, underlyingType) = getUnderlyingParameterOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = getUnerasedClassNameFromType(t) +val newTypePath = s"""- Scala value class: $clsName($underlyingClsName)""" +: + walkedTypePath + +// Nested value class is treated as its underlying type +// because the compiler will convert value class in the schema to +// its underlying type. +// However, for value class that is top-level or array element, +// if it is used as another type (e.g. as its parent trait or generic), +// the compiler keeps the class so we must provide an instance of the +// class too. In other cases, the compiler will handle wrapping/unwrapping +// for us automatically. +val arg = deserializerFor(underlyingType, path, newTypePath, Some(t)) +val isCollectionElement = lastType.exists { lt => + lt <:< localTypeOf[Array[_]] || lt <:< localTypeOf[Seq[_]] +} +if (lastType.isEmpty || isCollectionElement) { --- End diff -- it looks to me that we don't need `lastType`, but just a boolean parameter "needInstantiateValueClass". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228783542 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -373,6 +383,32 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil) + case t if isValueClass(t) => +val (_, underlyingType) = getUnderlyingParameterOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = getUnerasedClassNameFromType(t) +val newTypePath = s"""- Scala value class: $clsName($underlyingClsName)""" +: + walkedTypePath + +// Nested value class is treated as its underlying type +// because the compiler will convert value class in the schema to +// its underlying type. +// However, for value class that is top-level or array element, +// if it is used as another type (e.g. as its parent trait or generic), +// the compiler keeps the class so we must provide an instance of the +// class too. In other cases, the compiler will handle wrapping/unwrapping +// for us automatically. +val arg = deserializerFor(underlyingType, path, newTypePath, Some(t)) +val isCollectionElement = lastType.exists { lt => + lt <:< localTypeOf[Array[_]] || lt <:< localTypeOf[Seq[_]] --- End diff -- how about map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228783130 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -184,7 +193,8 @@ object ScalaReflection extends ScalaReflection { private def deserializerFor( tpe: `Type`, path: Expression, - walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects { + walkedTypePath: Seq[String], + lastType: Option[Type]): Expression = cleanUpReflectionObjects { --- End diff -- can we add parameter doc for it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r228782980 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala --- @@ -76,7 +76,7 @@ object TypedAggregateExpression { None, bufferSerializer, bufferEncoder.resolveAndBind().deserializer, -outputEncoder.serializer, +outputEncoder.objSerializer, --- End diff -- to confirm, this is a un-related change and just clean up the code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r228782790 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -362,4 +362,38 @@ class ScalaReflectionSuite extends SparkFunSuite { assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) } + + test("SPARK-24762: serializer for Option of Product") { --- End diff -- do we need to add tests here? `ScalaReflection` is not updated in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r228782670 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -207,7 +198,7 @@ case class ExpressionEncoder[T]( val serializer: Seq[NamedExpression] = { val clsName = Utils.getSimpleName(clsTag.runtimeClass) -if (isSerializedAsStruct) { +if (isSerializedAsStruct && !classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass)) { --- End diff -- can we make sure that, other places calling `isSerializedAsStruct` don't need to check Option type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r228782536 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -17,6 +17,8 @@ displayTitle: Spark SQL Upgrading Guide - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. + - In Spark version 2.4 and earlier, `Dataset` doesn't support to encode `Option[Product]` at top-level row, because in Spark SQL entire top-level row can't be null. Since Spark 3.0, `Option[Product]` at top-level is encoded as a row with single struct column. Then with this support, `Aggregator` can also use use `Option[Product]` as buffer and output column types. --- End diff -- Usually we only add migration guide if something is broken and users must be aware of it when upgrading. I think this one is not the case? It's just a new feature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22788: [SPARK-25769][SQL]escape nested columns by backti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22788#discussion_r228781145 --- Diff: sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out --- @@ -161,7 +161,7 @@ SELECT db1.t1.i1 FROM t1, mydb2.t1 struct<> -- !query 18 output org.apache.spark.sql.AnalysisException -cannot resolve '`db1.t1.i1`' given input columns: [mydb2.t1.i1, mydb2.t1.i1]; line 1 pos 7 +cannot resolve '`db1`.`t1`.`i1`' given input columns: [mydb2.t1.i1, mydb2.t1.i1]; line 1 pos 7 --- End diff -- do you think we should just make `sql` same as `name`? It looks to me that `'db1.t1.i1'` is better than `` '`db1`.`t1`.`i1`' ``, as it's more compact and is not ambiguous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22755: [SPARK-25755][SQL][Test] Supplementation of non-C...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22755#discussion_r228780582 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -166,6 +167,17 @@ private[sql] trait SQLTestUtilsBase super.withSQLConf(pairs: _*)(f) } + /** + * A helper function for turning off/on codegen. + */ + protected def withCodegenTurnOffAndOn(f: String => Unit): Unit = { --- End diff -- nit: `withWholeStageCodegenOnAndOff` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r228780430 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { --- End diff -- Some more thoughts: `CreateHiveTableAsSelectCommand` just runs another command, so we will not get any metric for this plan node. It's OK if we use the hive writer, as we indeed can't get any metrics(the writing is done by hive). However, if we can convert and use Spark's native writer, we do have metrics. I think a better fix is to replace Hive CTAS with data source CTAS during optimization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22514 It's definitely not a blocker, and we don't need to hold RC5 because of it. I think it needs a little more review, and I'm going to cut RC5 today(2.4.0 has already been far delayed), so it's very likely we can't get it into 2.4.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228779505 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,45 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25860: Replace Literal(null, _) with FalseLiteral whenever possible") { + +def checkPlanIsEmptyLocalScan(df: DataFrame): Unit = df.queryExecution.executedPlan match { --- End diff -- this assumes we run `ConvertToLocalRelation`, let's use `withSQLConf` to make sure this rule is on. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228779276 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => + val newBranches = cw.branches.map { case (cond, value) => +replaceNullWithFalse(cond) -> replaceNullWithFalse(value) + } + val newElseValue = cw.elseValue.map(replaceNullWithFalse) + CaseWhen(newBranches, newElseValue) +case If(pred, trueVal, falseVal) if Seq(trueVal, falseVal).forall(isNullOrBoolean) => + If(replaceNullWithFalse(pred), replaceNullWithFalse(trueVal), replaceNullWithFalse(falseVal)) +case And(left, right) => --- End diff -- we need to be careful here. null && fales is false, null || true is true. Please take a look at https://github.com/apache/spark/pull/22702 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228779125 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => --- End diff -- this applies to `If` as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228779097 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => --- End diff -- actually just `cw.dataType == BooleanType`. If an expression is `NullType`, it should be replaced by null literal already. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22857: [SPARK-25860][SQL] Replace Literal(null, _) with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22857#discussion_r228779010 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -736,3 +736,65 @@ object CombineConcats extends Rule[LogicalPlan] { flattenConcats(concat) } } + +/** + * A rule that replaces `Literal(null, _)` with `FalseLiteral` for further optimizations. + * + * For example, `Filter(Literal(null, _))` is equal to `Filter(FalseLiteral)`. + * + * Another example containing branches is `Filter(If(cond, FalseLiteral, Literal(null, _)))`; + * this can be optimized to `Filter(If(cond, FalseLiteral, FalseLiteral))`, and eventually + * `Filter(FalseLiteral)`. + * + * As a result, many unnecessary computations can be removed in the query optimization phase. + * + * Similarly, the same logic can be applied to conditions in [[Join]], predicates in [[If]], + * conditions in [[CaseWhen]]. + */ +object ReplaceNullWithFalse extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond)) +case j @ Join(_, _, _, Some(cond)) => j.copy(condition = Some(replaceNullWithFalse(cond))) +case p: LogicalPlan => p transformExpressions { + case i @ If(pred, _, _) => i.copy(predicate = replaceNullWithFalse(pred)) + case CaseWhen(branches, elseValue) => +val newBranches = branches.map { case (cond, value) => + replaceNullWithFalse(cond) -> value +} +CaseWhen(newBranches, elseValue) +} + } + + /** + * Recursively replaces `Literal(null, _)` with `FalseLiteral`. + * + * Note that `transformExpressionsDown` can not be used here as we must stop as soon as we hit + * an expression that is not [[CaseWhen]], [[If]], [[And]], [[Or]] or `Literal(null, _)`. + */ + private def replaceNullWithFalse(e: Expression): Expression = e match { +case cw: CaseWhen if getValues(cw).forall(isNullOrBoolean) => --- End diff -- how about `cw.dataType == BooleanType || cw.dataType == NullType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22865#discussion_r228776973 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -462,7 +462,7 @@ object SQLConf { val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled") .doc("If true, enables Parquet's native record-level filtering using the pushed down " + "filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' " + - "is enabled.") + "is enabled and spark.sql.parquet.enableVectorizedReader is disabled.") --- End diff -- SGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22817: [SPARK-25816][SQL] Fix attribute resolution in nested ex...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22817 RC5 will have this fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22812 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22309: [SPARK-20384][SQL] Support value class in schema of Data...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22309 try to build sql/core? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22858: [SPARK-24709][SQL][2.4] map basestring to str for...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/22858 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22858: [SPARK-24709][SQL][2.4] map basestring to str for python...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22858 title updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22817: [SPARK-25816][SQL] Fix attribute resolution in ne...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22817#discussion_r228731805 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25816 ResolveReferences works with nested extractors") { +val df0 = Seq((1, Map(1 -> "a")), (2, Map(2 -> "b"))).toDF("1", "2") +val df1 = df0.select($"1".as("2"), $"2".as("1")) +val df2 = df1.filter($"1"(map_keys($"1")(0)) > "a") --- End diff -- +1, I think the test can be simplified ``` val df = Seq((1, Map(1 -> "a")), (2, Map(2 -> "b"))).toDF("key", "map") checkAnswer(df.select($"map"($"key")), Row("a") :: Row("b") :: Nil) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22817: [SPARK-25816][SQL] Fix attribute resolution in ne...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22817#discussion_r228731772 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2578,4 +2578,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row ("abc", 1)) } } + + test("SPARK-25816 ResolveReferences works with nested extractors") { +val df0 = Seq((1, Map(1 -> "a")), (2, Map(2 -> "b"))).toDF("1", "2") --- End diff -- can we use a normal name like `i`, `j` instead of 1, 2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22809: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22809 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22853: [SPARK-25845][SQL] Fix MatchError for calendar interval ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22853 LGTM. Do we target it to 2.4? The API in 2.4 is deprecated so I'm not sure if we still need to backport bug fixes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228731247 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -379,6 +388,28 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) + case t if isValueClass(t) => +val (_, underlyingType) = getUnderlyingParameterOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = getUnerasedClassNameFromType(t) +val newTypePath = s"""- Scala value class: $clsName($underlyingClsName)""" +: + walkedTypePath + +// Nested value class is treated as its underlying type +// because the compiler will convert value class in the schema to +// its underlying type. +// However, for top-level value class, if it is used as another type +// (e.g. as its parent trait or generic), the compiler keeps the class +// so we must provide an instance of the class too. In other cases, +// the compiler will handle wrapping/unwrapping for us automatically. +val arg = deserializerFor(underlyingType, path, newTypePath) +if (path.isDefined) { --- End diff -- did you rebase? I think `path` is not `Option` anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228731209 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala --- @@ -297,11 +307,16 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes ExpressionEncoder.tuple(intEnc, ExpressionEncoder.tuple(intEnc, longEnc)) } + // test for Scala value class encodeDecodeTest( PrimitiveValueClass(42), "primitive value class") - encodeDecodeTest( ReferenceValueClass(ReferenceValueClass.Container(1)), "reference value class") + encodeDecodeTest(StringWrapper("a"), "value class string") + encodeDecodeTest(ValueContainer(1, StringWrapper("b")), "value class nested") + encodeDecodeTest( --- End diff -- can we also test with null values? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r228730753 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -358,4 +368,20 @@ class ScalaReflectionSuite extends SparkFunSuite { assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) } + + test("schema for case class that is a value class") { +val schema = schemaFor[TestingValueClass.IntWrapper] +assert(schema === Schema(IntegerType, nullable = false)) + } + + test("schema for case class that contains value class fields") { +val schema = schemaFor[TestingValueClass.ValueClassData] +assert(schema === Schema( + StructType(Seq( +StructField("intField", IntegerType, nullable = false), +StructField("wrappedInt", IntegerType, nullable = false), --- End diff -- to confirm, scala value class for primitive type can't be null? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22812 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22858: [SPARK-24709][SQL][2.4] use str instead of basestring in...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22858 @HyukjinKwon thanks for the information! Shall we replace `str` with `basestring` in `functions.py` for master branch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22858: [SPARK-24709][SQL][2.4] use str instead of basest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22858#discussion_r228730582 --- Diff: python/pyspark/sql/functions.py --- @@ -2326,7 +2326,7 @@ def schema_of_json(json): >>> df.select(schema_of_json('{"a": 0}').alias("json")).collect() [Row(json=u'struct')] """ -if isinstance(json, basestring): +if isinstance(json, str): --- End diff -- shall we apply it to 2.4? I'm not aware of the background, why we did not put ``` if sys.version >= '3': basestring = str ``` in 2.4? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22858: [SPARK-24709][SQL][2.4] use str instead of basestring in...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22858 BTW the `from_csv` added in 3.0 also use `basestring`, maybe we should update it as well in master branch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22858: [SPARK-24709][SQL][2.4] use str instead of basestring in...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22858 cc @HyukjinKwon @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22858: [SPARK-24709][SQL][2.4] use str instead of basest...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22858 [SPARK-24709][SQL][2.4] use str instead of basestring ## What changes were proposed in this pull request? after backport https://github.com/apache/spark/pull/22775 to 2.4, the 2.4 sbt Jenkins QA job is broken, see https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-branch-2.4-test-sbt-hadoop-2.7/147/console I checked all the `isinstance` calls in `functions.py`, all of them use `str` to check string type. I don't know why `basestring` works in master and 2.4 maven build, but it's safer to follow exiting code. ## How was this patch tested? existing test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark python Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22858.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22858 commit 2917acd18994c3901c8c5b562cf87964bca879d9 Author: Wenchen Fan Date: 2018-10-27T11:12:10Z use str instead of basestring --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r228709483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/UnevaluableAggs.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +abstract class UnevaluableBooleanAggBase(arg: Expression) --- End diff -- We can leave a TODO saying that we should create a framework to replace aggregate functions, but I think the current patch is good enough for these 3 functions, and I'm not aware of more functions like them that we need to deal with. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22854: [SPARK-25854] fix mvn to not always exit 1
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22854#discussion_r228606581 --- Diff: build/mvn --- @@ -163,8 +163,14 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} echo "Using \`mvn\` from path: $MVN_BIN" 1>&2 -# Last, call the `mvn` command as usual +# call the `mvn` command as usual "${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@" +MVN_RETCODE=$? -# Try to shut down zinc explicitly -"${ZINC_BIN}" -shutdown -port ${ZINC_PORT} +# SPARK-25854 +# Try to shut down zinc explicitly if the server is still running. if it's not running, +# it's timed out and we'll still need to exit the script w/a 0 to keep the build from +# failing. +"${ZINC_BIN}" -shutdown -port ${ZINC_PORT} || true --- End diff -- do we still need `|| true`? we always return `$MVN_RETCODE` now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22854: [SPARK-25854] fix mvn to not always exit 1
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22854#discussion_r228588061 --- Diff: build/mvn --- @@ -163,8 +163,19 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} echo "Using \`mvn\` from path: $MVN_BIN" 1>&2 -# Last, call the `mvn` command as usual +# call the `mvn` command as usual "${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@" -# Try to shut down zinc explicitly -"${ZINC_BIN}" -shutdown -port ${ZINC_PORT} +# check to see if zinc server is still running post-build +"${ZINC_BIN}" -status -port ${ZINC_PORT} &> /dev/null +ZINC_STATUS=$? + +# Try to shut down zinc explicitly if the server is still running --- End diff -- I know it's very unlikely, but there is a chance that the zinc is timed out between we check its status and shut it down. Since zinc will be timed out eventually, we don't care too much about if we can shut it down successfully here. So how about `"${ZINC_BIN}" -shutdown -port ${ZINC_PORT} || true`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22775 Actually this is not that hard. The conflict comes from the fact that in 2.4 `schema_of_json` doesn't take `option` parameter. I've fixed the conflict and pushed to 2.4. You can take a look at the commit and see if there is something wrong. I ran the touched tests locally to verify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22775 thanks, merging to master! can you send a new PR for 2.4? it conflicts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r228499222 --- Diff: python/pyspark/sql/functions.py --- @@ -2365,30 +2365,32 @@ def to_json(col, options={}): @ignore_unicode_prefix @since(2.4) -def schema_of_json(col, options={}): +def schema_of_json(json, options={}): """ -Parses a column containing a JSON string and infers its schema in DDL format. +Parses a JSON string and infers its schema in DDL format. -:param col: string column in json format +:param json: a JSON string or a string literal containing a JSON string. :param options: options to control parsing. accepts the same options as the JSON datasource .. versionchanged:: 3.0 It accepts `options` parameter to control schema inferring. ->>> from pyspark.sql.types import * ->>> data = [(1, '{"a": 1}')] ->>> df = spark.createDataFrame(data, ("key", "value")) ->>> df.select(schema_of_json(df.value).alias("json")).collect() -[Row(json=u'struct')] +>>> df = spark.range(1) >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() [Row(json=u'struct')] ->>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'}) +>>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'}) >>> df.select(schema.alias("json")).collect() [Row(json=u'struct')] """ +if isinstance(json, basestring): --- End diff -- after more thoughts, maybe we should not add new features to 2.4? We can accept strings directly in 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22846: [SPARK-25797][SQL][DOCS] Add migration doc for solving i...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22846 LGTM, merging to master/2.4! it conflicts with 2.3, can you send a new PR? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22847: [SPARK-25850][SQL] Make the split threshold for t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22847#discussion_r228483780 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -812,6 +812,17 @@ object SQLConf { .intConf .createWithDefault(65535) + val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold") +.internal() +.doc("The maximum source code length of a single Java function by codegen. When the " + + "generated Java function source code exceeds this threshold, it will be split into " + + "multiple small functions, each function length is spark.sql.codegen.methodSplitThreshold." + --- End diff -- `each function length is spark.sql.codegen.methodSplitThreshold` this is not true, the method size is always larger than the threshold. cc @kiszk any idea about the naming and description of this config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228482085 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -212,27 +212,27 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 1. Converts the predicate to false when the list is empty and *the value is not nullable. * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 3. Replaces [[In (values, seq[Literal])]] with optimized version *[[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty => + case i @ In(_, list) if list.isEmpty => // When v is not nullable, the following expression will be optimized // to FalseLiteral which is tested in OptimizeInSuite.scala -If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) - case expr @ In(v, list) if expr.inSetConvertible => +If(IsNotNull(i.value), FalseLiteral, Literal(null, BooleanType)) + case expr @ In(_, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.length == 1 // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, // TODO: we exclude them in this rule. - && !v.isInstanceOf[CreateNamedStructLike] + && !expr.value.isInstanceOf[CreateNamedStructLike] --- End diff -- ``` @transient protected lazy val isMultiValued = values.length > 1 @transient lazy val value: Expression = if (isMultiValued) { CreateNamedStruct(values.zipWithIndex.flatMap { case (v: NamedExpression, _) => Seq(Literal(v.name), v) case (v, idx) => Seq(Literal(s"_$idx"), v) }) } else { values.head } } ``` According to the implementation, `expr.value.isInstanceOf[CreateNamedStructLike]` means `expr.values.length > 1`, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228441256 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -212,27 +212,27 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 1. Converts the predicate to false when the list is empty and *the value is not nullable. * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 3. Replaces [[In (values, seq[Literal])]] with optimized version *[[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty => + case i @ In(_, list) if list.isEmpty => // When v is not nullable, the following expression will be optimized // to FalseLiteral which is tested in OptimizeInSuite.scala -If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) - case expr @ In(v, list) if expr.inSetConvertible => +If(IsNotNull(i.value), FalseLiteral, Literal(null, BooleanType)) + case expr @ In(_, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.length == 1 // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, // TODO: we exclude them in this rule. - && !v.isInstanceOf[CreateNamedStructLike] + && !expr.value.isInstanceOf[CreateNamedStructLike] && !newList.head.isInstanceOf[CreateNamedStructLike]) { - EqualTo(v, newList.head) + EqualTo(expr.value, newList.head) --- End diff -- shall we update the match here? I think it should be `In(Seq(vaue) ...)` now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r228434359 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -831,7 +832,14 @@ case class HashAggregateExec( ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input val updateRowInRegularHashMap: String = { - ctx.INPUT_ROW = unsafeRowBuffer + val updatedTmpAggBuffer = +if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) { + updatedAggBuffer --- End diff -- Did you update? This is not what I proposed... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22775 seems like a real test failure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22823: [SPARK-25676][SQL][TEST] Improve BenchmarkWideTab...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22823#discussion_r228409979 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala --- @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.benchmark - -import org.apache.spark.benchmark.Benchmark - -/** - * Benchmark to measure performance for wide table. - * To run this: - * build/sbt "sql/test-only *benchmark.BenchmarkWideTable" - * - * Benchmarks in this file are skipped in normal builds. - */ -class BenchmarkWideTable extends BenchmarkWithCodegen { - - ignore("project on wide table") { -val N = 1 << 20 -val df = sparkSession.range(N) -val columns = (0 until 400).map{ i => s"id as id$i"} -val benchmark = new Benchmark("projection on wide table", N) -benchmark.addCase("wide table", numIters = 5) { iter => - df.selectExpr(columns : _*).queryExecution.toRdd.count() -} -benchmark.run() - -/** - * Here are some numbers with different split threshold: - * - * Split threshold methods Rate(M/s) Per Row(ns) - * 10 400 0.4 2279 - * 100 200 0.6 1554 - * 1k 370.9 1116 --- End diff -- I think we should have a PR to add this config officially. It should be useful for performance tunning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22841: [SPARK-25842][SQL] Deprecate rangeBetween APIs introduce...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22841 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22790 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228393734 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1561,6 +1561,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_IN_FALSE_FOR_NULL_FIELD = +buildConf("spark.sql.legacy.inOperator.falseForNullField") + .internal() + .doc("When set to true (default), the IN operator returns false when comparing literal " + --- End diff -- we should mention it only applies to multi-value in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228393417 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -212,27 +212,27 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 1. Converts the predicate to false when the list is empty and *the value is not nullable. * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 3. Replaces [[In (values, seq[Literal])]] with optimized version *[[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty => + case i @ In(_, list) if list.isEmpty => // When v is not nullable, the following expression will be optimized // to FalseLiteral which is tested in OptimizeInSuite.scala -If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) - case expr @ In(v, list) if expr.inSetConvertible => +If(IsNotNull(i.value), FalseLiteral, Literal(null, BooleanType)) + case expr @ In(_, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.length == 1 // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, // TODO: we exclude them in this rule. - && !v.isInstanceOf[CreateNamedStructLike] + && !expr.value.isInstanceOf[CreateNamedStructLike] && !newList.head.isInstanceOf[CreateNamedStructLike]) { - EqualTo(v, newList.head) + EqualTo(expr.value, newList.head) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228393378 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -212,27 +212,27 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 1. Converts the predicate to false when the list is empty and *the value is not nullable. * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 3. Replaces [[In (values, seq[Literal])]] with optimized version *[[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty => + case i @ In(_, list) if list.isEmpty => // When v is not nullable, the following expression will be optimized // to FalseLiteral which is tested in OptimizeInSuite.scala -If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) - case expr @ In(v, list) if expr.inSetConvertible => +If(IsNotNull(i.value), FalseLiteral, Literal(null, BooleanType)) --- End diff -- this needs to look at `inFalseForNullField` right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228392936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -202,7 +209,11 @@ case class InSubquery(values: Seq[Expression], query: ListQuery) */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.", + usage = """ +expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN. Otherwise, if + spark.sql.legacy.inOperator.falseForNullField is false and any of the elements or fields of + the elements is null it returns null, else it returns false. --- End diff -- I'm wondering if we should use ``` """ |xxx """.stripMargin ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22812#discussion_r228391626 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2384,14 +2384,23 @@ class Analyzer( case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved => inputData.dataType match { case ArrayType(et, cn) => - val expr = MapObjects(func, inputData, et, cn, cls) transformUp { + MapObjects(func, inputData, et, cn, cls) transformUp { case UnresolvedExtractValue(child, fieldName) if child.resolved => ExtractValue(child, fieldName, resolver) } - expr case other => throw new AnalysisException("need an array field but got " + other.catalogString) } +case u: UnresolvedCatalystToExternalMap if u.child.resolved => + u.child.dataType match { +case _: MapType => + CatalystToExternalMap(u) transformUp { +case UnresolvedExtractValue(child, fieldName) if child.resolved => --- End diff -- TBH I don't quite remember why I did this for `MapObjects`, so I just follow it here. Maybe we can remove it in a followup PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22775 if we are ok with this direction, this LGTM except a few minor comments. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r228389610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -770,8 +776,17 @@ case class SchemaOfJson( factory } - override def convert(v: UTF8String): UTF8String = { -val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => + @transient + private lazy val json = child.eval().asInstanceOf[UTF8String] + + override def checkInputDataTypes(): TypeCheckResult = child match { +case Literal(s, StringType) if s != null => super.checkInputDataTypes() +case _ => TypeCheckResult.TypeCheckFailure( + s"The input json should be a string literal and not null; however, got ${child.sql}.") + } + + override def eval(v: InternalRow = EmptyRow): Any = { --- End diff -- when implementing `eval`, we usually don't put the default value. Shall we follow this code style? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r228389510 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -752,14 +752,20 @@ case class StructsToJson( case class SchemaOfJson( child: Expression, options: Map[String, String]) - extends UnaryExpression with String2StringExpression with CodegenFallback { + extends UnaryExpression with ExpectsInputTypes with CodegenFallback { def this(child: Expression) = this(child, Map.empty[String, String]) def this(child: Expression, options: Expression) = this( child = child, options = ExprUtils.convertToMapData(options)) + override def dataType: DataType = StringType + + override def inputTypes: Seq[DataType] = Seq(StringType) --- End diff -- why do we need it since we already override `checkInputDataTypes`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r228389378 --- Diff: python/pyspark/sql/functions.py --- @@ -2375,20 +2375,22 @@ def schema_of_json(col, options={}): .. versionchanged:: 3.0 It accepts `options` parameter to control schema inferring. ->>> from pyspark.sql.types import * ->>> data = [(1, '{"a": 1}')] ->>> df = spark.createDataFrame(data, ("key", "value")) ->>> df.select(schema_of_json(df.value).alias("json")).collect() -[Row(json=u'struct')] +>>> df = spark.range(1) >>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect() [Row(json=u'struct')] ->>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'}) +>>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'}) >>> df.select(schema.alias("json")).collect() [Row(json=u'struct')] """ +if isinstance(col, basestring): --- End diff -- shall we do the same for scala APIs? i.e. create `def schema_of_json(json: String)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22825: [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromMap
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22825 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22812#discussion_r228388671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2384,14 +2384,23 @@ class Analyzer( case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved => inputData.dataType match { case ArrayType(et, cn) => - val expr = MapObjects(func, inputData, et, cn, cls) transformUp { + MapObjects(func, inputData, et, cn, cls) transformUp { case UnresolvedExtractValue(child, fieldName) if child.resolved => ExtractValue(child, fieldName, resolver) } - expr case other => throw new AnalysisException("need an array field but got " + other.catalogString) } +case u: UnresolvedCatalystToExternalMap if u.child.resolved => + u.child.dataType match { +case _: MapType => + CatalystToExternalMap(u) transformUp { +case UnresolvedExtractValue(child, fieldName) if child.resolved => --- End diff -- Yea I think so. The `UnresolvedExtractValue` might appear in `CatalystToExternalMap.keyLambdaFunction` and `valueLambdaFunction` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228388259 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala --- @@ -31,10 +32,32 @@ package object avro { * @since 2.4.0 */ @Experimental - def from_avro(data: Column, jsonFormatSchema: String): Column = { -new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema)) + def from_avro( + data: Column, + jsonFormatSchema: String): Column = { +new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty)) + } + + /** + * Converts a binary column of avro format into its corresponding catalyst value. The specified + * schema must match the read data, otherwise the behavior is undefined: it may fail or return + * arbitrary result. + * + * @param data the binary column. + * @param jsonFormatSchema the avro schema in JSON string format. + * @param options options to control how the Avro record is parsed. + * + * @since 3.0.0 + */ + @Experimental + def from_avro( + data: Column, + jsonFormatSchema: String, + options: Map[String, String]): Column = { --- End diff -- ah that's a good point, as that's what we do for other APIs as well. We can think for about this later, and change all the APIs together, if other people complains. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22821: [SPARK-25832][SQL] remove newly added map related...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/22821 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org