[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16068


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91669101
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

oh you are right, I misread the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91570259
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

  case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
  ShuffleExchange(RoundRobinPartitioning(numPartitions), 
planLater(child)) :: Nil
} else {
  execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}

  case logical.RepartitionByExpression(expressions, child, nPartitions) 
=>
exchange.ShuffleExchange(HashPartitioning(
  expressions, nPartitions.getOrElse(numPartitions)), 
planLater(child)) :: Nil


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91569919
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

My understanding is that repartition uses RoundRobinPartitioning. 
repartition(2, $"id" < 5) uses hash implementation. In this case, I found all 
rows are shuffled to one partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91526428
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,26 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 5 as statefulUDF returns the sequence number 
starting from 1,
+  // and the data is evenly distributed into 2 partitions.
--- End diff --

`repartition(2)` doesn't guarantee even distribution, it depends on the 
hash implementation used in shuffle, that's why I suggest `repartition(2, $"id" 
< 5)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91248703
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +489,28 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.range(10).repartition(1)
+  val testData1 = spark.range(10).repartition(2)
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  checkAnswer(testData.selectExpr("statefulUDF() as 
s").agg(max($"s")), Row(10))
+
+  // Expected Max(s) is 10 as statefulUDF returns the sequence number 
starting from 1.
+  // Note that testData.union(testData) will have Row(20) because 
internally it is converted
--- End diff --

if you need to write comments to explain some technical problems for the 
test, it would be better to rewrite the test and avoid this problem. How about:
```
checkAnswer(spark.range(10).repartition(2, $"id" < 
5).selectExpr("statefulUDF() as s").agg(max($"s")), Row(5))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91150298
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
--- End diff --

```Scala
val testData = spark.range(10).repartition(1)
testData.selectExpr("statefulUDF() as s")
```

Try this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-06 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91142141
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
--- End diff --

The test cannot resolve the function and throw the error if I use:
  test("Hive Stateful UDF") {
withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) {
  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
  val testData = spark.range(10).repartition(1)
  println(s"start session: $spark")
  val m = testData.select("statefulUDF() as s, ")
  checkAnswer(testData.select("statefulUDF() as s").agg(max($"s")), 
Row(10))
  ...

Do I miss anything, or is it a bug? I will investigate why this happens.

Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot 
resolve '`statefulUDF() as s`' given input columns: [id];;
'Project ['statefulUDF() as s]
+- Repartition 1, true
   +- Range (0, 10, step=1, splits=Some(1))

org.apache.spark.sql.AnalysisException: cannot resolve '`statefulUDF() as 
s`' given input columns: [id];;
'Project ['statefulUDF() as s]
+- Repartition 1, true
   +- Range (0, 10, step=1, splits=Some(1))

at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:314)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:313)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:282)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:292)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:296)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:296)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$7.apply(QueryPlan.scala:301)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:192)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:301)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:132)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature

[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91027951
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
--- End diff --

we can simplify this test a bit:
```
val testData = spark.range(10)
checkAnswer(testData.repartition(1).select("statefulUDF() as s").max($"s"), 
Row(10))
...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91026585
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+// Distribute all rows to one partition (all rows have the same 
content),
--- End diff --

@cloud-fan  Thanks for the review. Because all rows only contains 
IntegerCaseClass(1), RepartitionByExpression will assign all rows to one 
partition, which has 10 records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91026433
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+// Distribute all rows to one partition (all rows have the same 
content),
+// and expected Max(s) is 10 as statefulUDF returns the sequence 
number starting from 1.
+checkAnswer(
+  sql(
+"""
+|SELECT MAX(s) FROM
+|  (SELECT statefulUDF() as s FROM
+|(SELECT i from inputTable DISTRIBUTE by i) a
+|) b
+  """.stripMargin),
+  Row(10))
+
+// Expected Max(s) is 5, as there are 2 partitions with 5 rows 
each, and statefulUDF
+// returns the sequence number of the rows in the partition 
starting from 1.
+checkAnswer(
+  sql(
+"""
+  |SELECT MAX(s) FROM
+  |  (SELECT statefulUDF() as s FROM
+  |(SELECT i from inputTable) a
+  |) b
+""".stripMargin),
+  Row(5))
+
+// Expected Max(s) is 1, as stateless UDF is deterministic and 
replaced by constant 1.
--- End diff --

StatelessUDF is foldable:   override def foldable: Boolean = 
isUDFDeterministic && children.forall(_.foldable)

ConstantFolding optimizer will replace it with constant:
  case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)

Here is the explain(true):
== Parsed Logical Plan ==
'Project [unresolvedalias('MAX('s), None)]
+- 'SubqueryAlias b
   +- 'Project ['statelessUDF() AS s#39]
  +- 'SubqueryAlias a
 +- 'RepartitionByExpression ['i]
+- 'Project ['i]
   +- 'UnresolvedRelation `inputTable`

== Analyzed Logical Plan ==
max(s): bigint
Aggregate [max(s#39L) AS max(s)#46L]
+- SubqueryAlias b
   +- Project 
[HiveSimpleUDF#org.apache.spark.sql.hive.execution.StatelessUDF() AS s#39L]
  +- SubqueryAlias a
 +- RepartitionByExpression [i#4]
+- Project [i#4]
   +- SubqueryAlias inputtable
  +- SerializeFromObject 
[assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
 +- ExternalRDD [obj#3]

== Optimized Logical Plan ==
Aggregate [max(s#39L) AS max(s)#46L]
+- Project [1 AS s#39L]
   +- RepartitionByExpression [i#4]
  +- SerializeFromObject [assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
 +- ExternalRDD [obj#3]

== Physical Plan ==
*HashAggregate(keys=[], functions=[max(s#39L)], output=[max(s)#46L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(s#39L)], 
output=[max#48L])
  +- *Project [1 AS s#39L]
 +- Exchange hashpartitioning(i#4, 5)
+- *SerializeFromObject [assertnotnull(assertnotnull(input[0, 
org.apache.spark.sql.hive.execution.IntegerCaseClass, true], top level Product 
input object), - root class: 
"org.apache.spark.sql.hive.execution.IntegerCaseClass").i AS i#4]
   +- Scan ExternalRDDScan[obj#3]



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91020060
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,52 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  withTempView("inputTable") {
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+// Distribute all rows to one partition (all rows have the same 
content),
--- End diff --

Why `DISTRIBUTE BY` can distribute all rows to one partition? It's 
implemented by `RepartitionByExpression` which doesn't always use one partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r91000387
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,45 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+withUserDefinedFunction("statefulUDF" -> true, "statelessUDF" -> true) 
{
+  sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+  sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+  val testData = spark.sparkContext.parallelize(
+(0 until 10) map (x => IntegerCaseClass(1)), 2).toDF()
+  testData.createOrReplaceTempView("inputTable")
+  checkAnswer(
+sql(
+  """
+|SELECT MAX(s) FROM
+|  (SELECT statefulUDF() as s FROM
+|(SELECT i from inputTable DISTRIBUTE by i) a
+|) b
+  """.stripMargin),
+Row(10))
--- End diff --

can you add some comments to explain this test? why the expected result is 
10?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r90953581
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,43 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
--- End diff --

Could you use `withUserDefinedFunction` for these two functions? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-03 Thread zhzhan
Github user zhzhan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r90763121
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+val max1 =
+  sql("SELECT MAX(s) FROM (" +
+"SELECT statefulUDF() as s FROM (SELECT i from inputTable 
DISTRIBUTE by i) a" +
+") b").head().getLong(0)
--- End diff --

will rewrite it after gathering feedback from others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r90756326
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -144,7 +144,7 @@ private[hive] case class HiveGenericUDF(
   @transient
   private lazy val isUDFDeterministic = {
 val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
-udfType != null && udfType.deterministic()
+udfType != null && udfType.deterministic() && !udfType.stateful()
--- End diff --

an unrelated question, what's the difference between 
`udfType.deterministic` and `udfType.stateful`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16068: [SPARK-18637][SQL]Stateful UDF should be consider...

2016-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16068#discussion_r90752814
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala 
---
@@ -487,6 +488,29 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils {
 assert(count4 == 1)
 sql("DROP TABLE parquet_tmp")
   }
+
+  test("Hive Stateful UDF") {
+sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION statelessUDF AS 
'${classOf[StatelessUDF].getName}'")
+val testData = spark.sparkContext.parallelize(
+  (0 until 10) map(x => IntegerCaseClass(1)), 2).toDF()
+testData.createOrReplaceTempView("inputTable")
+val max1 =
+  sql("SELECT MAX(s) FROM (" +
+"SELECT statefulUDF() as s FROM (SELECT i from inputTable 
DISTRIBUTE by i) a" +
+") b").head().getLong(0)
--- End diff --

How about using multiline string syntax here?
```Scala
sql(
  """
|SELECT MAX(s) FROM
|  (SELECT statefulUDF() as s FROM
|(SELECT i from inputTable DISTRIBUTE BY i) a
|  ) b
  """.stripMargin)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org