[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16068 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91669101 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- oh you are right, I misread the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91570259 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- case logical.Repartition(numPartitions, shuffle, child) => if (shuffle) { ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil } else { execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91569919 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- My understanding is that repartition uses RoundRobinPartitioning. repartition(2, $"id" < 5) uses hash implementation. In this case, I found all rows are shuffled to one partition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91526428 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 5 as statefulUDF returns the sequence number starting from 1, + // and the data is evenly distributed into 2 partitions. --- End diff -- `repartition(2)` doesn't guarantee even distribution, it depends on the hash implementation used in shuffle, that's why I suggest `repartition(2, $"id" < 5)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91248703 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +489,28 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.range(10).repartition(1) + val testData1 = spark.range(10).repartition(2) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + checkAnswer(testData.selectExpr("statefulUDF() as s").agg(max($"s")), Row(10)) + + // Expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. + // Note that testData.union(testData) will have Row(20) because internally it is converted --- End diff -- if you need to write comments to explain some technical problems for the test, it would be better to rewrite the test and avoid this problem. How about: ``` checkAnswer(spark.range(10).repartition(2, $"id" < 5).selectExpr("statefulUDF() as s").agg(max($"s")), Row(5)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91150298 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { --- End diff -- ```Scala val testData = spark.range(10).repartition(1) testData.selectExpr("statefulUDF() as s") ``` Try this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91142141 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { --- End diff -- The test cannot resolve the function and throw the error if I use: test("Hive Stateful UDF") { withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") val testData = spark.range(10).repartition(1) println(s"start session: $spark") val m = testData.select("statefulUDF() as s, ") checkAnswer(testData.select("statefulUDF() as s").agg(max($"s")), Row(10)) ... Do I miss anything, or is it a bug? I will investigate why this happens. Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as s`' given input columns: [id];; 'Project ['statefulUDF() as s] +- Repartition 1, true +- Range (0, 10, step=1, splits=Some(1)) org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as s`' given input columns: [id];; 'Project ['statefulUDF() as s] +- Repartition 1, true +- Range (0, 10, step=1, splits=Some(1)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:313) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:296) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$7.apply(QueryPlan.scala:301) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:192) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:301) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91027951 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { --- End diff -- we can simplify this test a bit: ``` val testData = spark.range(10) checkAnswer(testData.repartition(1).select("statefulUDF() as s").max($"s"), Row(10)) ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91026585 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { +val testData = spark.sparkContext.parallelize( + (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +// Distribute all rows to one partition (all rows have the same content), --- End diff -- @cloud-fan Thanks for the review. Because all rows only contains IntegerCaseClass(1), RepartitionByExpression will assign all rows to one partition, which has 10 records. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91026433 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { +val testData = spark.sparkContext.parallelize( + (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +// Distribute all rows to one partition (all rows have the same content), +// and expected Max(s) is 10 as statefulUDF returns the sequence number starting from 1. +checkAnswer( + sql( +""" +|SELECT MAX(s) FROM +| (SELECT statefulUDF() as s FROM +|(SELECT i from inputTable DISTRIBUTE by i) a +|) b + """.stripMargin), + Row(10)) + +// Expected Max(s) is 5, as there are 2 partitions with 5 rows each, and statefulUDF +// returns the sequence number of the rows in the partition starting from 1. +checkAnswer( + sql( +""" + |SELECT MAX(s) FROM + | (SELECT statefulUDF() as s FROM + |(SELECT i from inputTable) a + |) b +""".stripMargin), + Row(5)) + +// Expected Max(s) is 1, as stateless UDF is deterministic and replaced by constant 1. --- End diff -- StatelessUDF is foldable: override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) ConstantFolding optimizer will replace it with constant: case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType) Here is the explain(true): == Parsed Logical Plan == 'Project [unresolvedalias('MAX('s), None)] +- 'SubqueryAlias b +- 'Project ['statelessUDF() AS s#39] +- 'SubqueryAlias a +- 'RepartitionByExpression ['i] +- 'Project ['i] +- 'UnresolvedRelation `inputTable` == Analyzed Logical Plan == max(s): bigint Aggregate [max(s#39L) AS max(s)#46L] +- SubqueryAlias b +- Project [HiveSimpleUDF#org.apache.spark.sql.hive.execution.StatelessUDF() AS s#39L] +- SubqueryAlias a +- RepartitionByExpression [i#4] +- Project [i#4] +- SubqueryAlias inputtable +- SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- ExternalRDD [obj#3] == Optimized Logical Plan == Aggregate [max(s#39L) AS max(s)#46L] +- Project [1 AS s#39L] +- RepartitionByExpression [i#4] +- SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- ExternalRDD [obj#3] == Physical Plan == *HashAggregate(keys=[], functions=[max(s#39L)], output=[max(s)#46L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_max(s#39L)], output=[max#48L]) +- *Project [1 AS s#39L] +- Exchange hashpartitioning(i#4, 5) +- *SerializeFromObject [assertnotnull(assertnotnull(input[0, org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product input object), - root class: "org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4] +- Scan ExternalRDDScan[obj#3] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91020060 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + withTempView("inputTable") { +val testData = spark.sparkContext.parallelize( + (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +// Distribute all rows to one partition (all rows have the same content), --- End diff -- Why `DISTRIBUTE BY` can distribute all rows to one partition? It's implemented by `RepartitionByExpression` which doesn't always use one partition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r91000387 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,45 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) { + sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") + sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") + val testData = spark.sparkContext.parallelize( +(0 until 10) map (x => IntegerCaseClass(1)), 2).toDF() + testData.createOrReplaceTempView("inputTable") + checkAnswer( +sql( + """ +|SELECT MAX(s) FROM +| (SELECT statefulUDF() as s FROM +|(SELECT i from inputTable DISTRIBUTE by i) a +|) b + """.stripMargin), +Row(10)) --- End diff -- can you add some comments to explain this test? why the expected result is 10? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r90953581 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,43 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") --- End diff -- Could you use `withUserDefinedFunction` for these two functions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r90763121 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") +sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") +val testData = spark.sparkContext.parallelize( + (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +val max1 = + sql("SELECT MAX(s) FROM (" + +"SELECT statefulUDF() as s FROM (SELECT i from inputTable DISTRIBUTE by i) a" + +") b").head().getLong(0) --- End diff -- will rewrite it after gathering feedback from others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r90756326 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -144,7 +144,7 @@ private[hive] case class HiveGenericUDF( @transient private lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) -udfType != null && udfType.deterministic() +udfType != null && udfType.deterministic() && !udfType.stateful() --- End diff -- an unrelated question, what's the difference between `udfType.deterministic` and `udfType.stateful`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16068#discussion_r90752814 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala --- @@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { assert(count4 == 1) sql("DROP TABLE parquet_tmp") } + + test("Hive Stateful UDF") { +sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") +sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS '${classOf[StatelessUDF].getName}'") +val testData = spark.sparkContext.parallelize( + (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF() +testData.createOrReplaceTempView("inputTable") +val max1 = + sql("SELECT MAX(s) FROM (" + +"SELECT statefulUDF() as s FROM (SELECT i from inputTable DISTRIBUTE by i) a" + +") b").head().getLong(0) --- End diff -- How about using multiline string syntax here? ```Scala sql( """ |SELECT MAX(s) FROM | (SELECT statefulUDF() as s FROM |(SELECT i from inputTable DISTRIBUTE BY i) a | ) b """.stripMargin) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org